Skip to main content

rom_core/
monitor.rs

1//! Monitor module for orchestrating state updates and display rendering
2use std::{
3  io::{BufRead, Write},
4  time::Duration,
5};
6
7use cognos::Host;
8use tracing::debug;
9
10use crate::{
11  cache::BuildReportCache,
12  display::{Display, DisplayConfig},
13  error::{Result, RomError},
14  state::{BuildStatus, Derivation, FailType, State, StorePath},
15  types::{Config, InputMode},
16  update,
17};
18
19enum HumanParserState {
20  Idle,
21  PlanBuilds,
22  PlanDownloads,
23}
24
25struct HumanParser {
26  state:   HumanParserState,
27  pending: Vec<String>,
28}
29
30impl HumanParser {
31  fn new() -> Self {
32    Self {
33      state:   HumanParserState::Idle,
34      pending: Vec::new(),
35    }
36  }
37}
38
39/// Processes nix output and displays progress
40pub struct Monitor<W: Write> {
41  state:        State,
42  display:      Display<W>,
43  config:       Config,
44  human_parser: HumanParser,
45}
46
47impl<W: Write> Monitor<W> {
48  /// Create a new monitor
49  pub fn new(config: Config, writer: W) -> Result<Self> {
50    let display_config = DisplayConfig {
51      show_timers:       config.show_timers,
52      max_tree_depth:    10,
53      max_visible_lines: 100,
54      use_color:         !config.piping,
55      format:            config.format,
56      legend_style:      config.legend_style,
57      summary_style:     config.summary_style,
58      icons:             crate::icons::detect(),
59    };
60
61    let display = Display::new(writer, display_config)?;
62    let mut state = State::new();
63
64    // Load build cache for predictions
65    let cache_path = BuildReportCache::default_cache_path();
66    let cache = BuildReportCache::new(cache_path);
67    state.build_cache = cache.load();
68
69    Ok(Self {
70      state,
71      display,
72      config,
73      human_parser: HumanParser::new(),
74    })
75  }
76
77  /// Process a stream of input
78  pub fn process_stream<R: BufRead>(&mut self, reader: R) -> Result<()> {
79    let mut last_render = std::time::Instant::now();
80    let render_interval = Duration::from_millis(100);
81
82    // XXX: Poll for local build completions every 200ms. It's specifically this
83    // value because NOM 200ms polling to detect builds that finished
84    // without explicit stop events. I'll probably redo this.
85    let mut last_poll = std::time::Instant::now();
86    let poll_interval = Duration::from_millis(200);
87
88    for line in reader.lines() {
89      let line = line.map_err(RomError::Io)?;
90
91      // Process the line
92      self.process_line(&line)?;
93
94      // Poll for local build completions
95      if last_poll.elapsed() >= poll_interval {
96        let now = crate::state::current_time();
97        crate::update::detect_local_completed_builds(&mut self.state, now);
98        last_poll = std::time::Instant::now();
99      }
100
101      // Render periodically
102      if last_render.elapsed() >= render_interval {
103        self.display.render(&self.state, &[])?;
104        last_render = std::time::Instant::now();
105      }
106    }
107
108    // Mark as finished and do final render
109    crate::update::finish_state(&mut self.state);
110
111    self.display.render_final(&self.state)?;
112
113    // Save build cache for future predictions
114    let cache_path = BuildReportCache::default_cache_path();
115    let cache = BuildReportCache::new(cache_path);
116    if let Err(e) = cache.save(&self.state.build_cache) {
117      debug!("Failed to save build cache: {}", e);
118      // Don't fail the build if cache save fails
119    }
120
121    // Return error code if there were failures
122    if self.state.has_errors() {
123      return Err(RomError::BuildFailed);
124    }
125
126    Ok(())
127  }
128
129  /// Process a single line of input
130  fn process_line(&mut self, line: &str) -> Result<bool> {
131    // Auto-detect format: lines starting with "@nix " are JSON
132    if line.starts_with("@nix ") {
133      self.process_json_line(line)
134    } else {
135      match self.config.input_mode {
136        InputMode::Json => self.process_json_line(line),
137        InputMode::Human => self.process_human_line(line),
138      }
139    }
140  }
141
142  /// Process a JSON-formatted line
143  fn process_json_line(&mut self, line: &str) -> Result<bool> {
144    // Nix JSON lines are prefixed with "@nix "
145    if let Some(json_str) = line.strip_prefix("@nix ") {
146      match serde_json::from_str::<cognos::Actions>(json_str) {
147        Ok(action) => {
148          // Handle message passthrough - print directly to stdout
149          if let cognos::Actions::Message { msg, .. } = &action {
150            println!("{msg}");
151          }
152
153          let changed = update::process_message(&mut self.state, action);
154          Ok(changed)
155        },
156        Err(e) => {
157          // Log parsing errors but don't fail
158          tracing::debug!("Failed to parse JSON message: {}", e);
159          Ok(false)
160        },
161      }
162    } else {
163      // Non-JSON lines in JSON mode are passed through
164      println!("{line}");
165      Ok(false)
166    }
167  }
168
169  /// Process a human-readable line
170  fn process_human_line(&mut self, line: &str) -> Result<bool> {
171    // Multi-line state: if we're collecting paths, check for continuation
172    match self.human_parser.state {
173      HumanParserState::PlanBuilds | HumanParserState::PlanDownloads => {
174        if line.starts_with("  /nix/store/")
175          || line.starts_with("\t/nix/store/")
176        {
177          // Accumulate store paths during plan listing
178          let path = line.trim().to_string();
179          self.human_parser.pending.push(path);
180          return Ok(true);
181        } else {
182          // Flush accumulated paths
183          let pending = std::mem::take(&mut self.human_parser.pending);
184          let is_builds =
185            matches!(self.human_parser.state, HumanParserState::PlanBuilds);
186          self.human_parser.state = HumanParserState::Idle;
187
188          for path_str in pending {
189            if is_builds {
190              if let Some(drv) = crate::state::Derivation::parse(&path_str) {
191                let drv_id = self.state.get_or_create_derivation_id(drv);
192                self.state.update_build_status(
193                  drv_id,
194                  crate::state::BuildStatus::Planned,
195                );
196                if !self.state.forest_roots.contains(&drv_id) {
197                  self.state.forest_roots.push(drv_id);
198                }
199              }
200            } else if let Some(sp) = crate::state::StorePath::parse(&path_str) {
201              let sp_id = self.state.get_or_create_store_path_id(sp);
202              self.state.full_summary.planned_downloads.insert(sp_id);
203            }
204          }
205          // Fall through to process current line normally
206        }
207      },
208      HumanParserState::Idle => {},
209    }
210
211    let trimmed = line.trim();
212
213    if trimmed.is_empty() {
214      return Ok(false);
215    }
216
217    // Plan detection: "these N derivations will be built:" or "this derivation
218    // will be built:"
219    if trimmed.ends_with("derivations will be built:")
220      || trimmed == "this derivation will be built:"
221      || trimmed.ends_with("derivation will be built:")
222    {
223      self.human_parser.state = HumanParserState::PlanBuilds;
224      self.human_parser.pending.clear();
225      return Ok(true);
226    }
227
228    // Plan detection: "these N paths will be fetched"
229    if trimmed.contains("paths will be fetched")
230      || trimmed.contains("path will be fetched")
231    {
232      self.human_parser.state = HumanParserState::PlanDownloads;
233      self.human_parser.pending.clear();
234      return Ok(true);
235    }
236
237    // "building '/nix/store/....drv' on 'ssh://host'..." -> remote build
238    if trimmed.starts_with("building '")
239      && trimmed.contains(".drv'")
240      && trimmed.contains(" on '")
241      && let Some(drv_path) = extract_path_from_message(trimmed)
242      && let Some(drv) = crate::state::Derivation::parse(&drv_path)
243    {
244      let host = extract_remote_host(trimmed).unwrap_or(Host::Localhost);
245      let drv_id = self.state.get_or_create_derivation_id(drv);
246      let now = crate::state::current_time();
247      self.state.update_build_status(
248        drv_id,
249        crate::state::BuildStatus::Building(crate::state::BuildInfo {
250          start: now,
251          host,
252          estimate: None,
253          activity_id: None,
254        }),
255      );
256      return Ok(true);
257    }
258
259    // "building '/nix/store/....drv'..." -> local build
260    if (trimmed.starts_with("building") || trimmed.contains("building '"))
261      && let Some(drv_path) = extract_path_from_message(trimmed)
262      && let Some(drv) = crate::state::Derivation::parse(&drv_path)
263    {
264      let drv_id = self.state.get_or_create_derivation_id(drv);
265      let now = crate::state::current_time();
266      self.state.update_build_status(
267        drv_id,
268        crate::state::BuildStatus::Building(crate::state::BuildInfo {
269          start:       now,
270          host:        Host::Localhost,
271          estimate:    None,
272          activity_id: None,
273        }),
274      );
275      return Ok(true);
276    }
277
278    // "copying path '/nix/store/...' from 'ssh://...'..." -> download
279    if trimmed.starts_with("copying path '")
280      && trimmed.contains("' from '")
281      && let Some(path_str) = extract_path_from_message(trimmed)
282      && let Some(path) = crate::state::StorePath::parse(&path_str)
283    {
284      let path_id = self.state.get_or_create_store_path_id(path);
285      let now = crate::state::current_time();
286      let host =
287        extract_remote_host_after(trimmed, "from '").unwrap_or(Host::Localhost);
288      self.state.full_summary.running_downloads.insert(
289        path_id,
290        crate::state::TransferInfo {
291          start: now,
292          host,
293          activity_id: 0,
294          bytes_transferred: 0,
295          total_bytes: None,
296        },
297      );
298      return Ok(true);
299    }
300
301    // "copying path '/nix/store/...' to 'ssh://...'..." -> upload
302    if trimmed.starts_with("copying path '")
303      && trimmed.contains("' to '")
304      && let Some(path_str) = extract_path_from_message(trimmed)
305      && let Some(path) = crate::state::StorePath::parse(&path_str)
306    {
307      let path_id = self.state.get_or_create_store_path_id(path);
308      let now = crate::state::current_time();
309      let host =
310        extract_remote_host_after(trimmed, "to '").unwrap_or(Host::Localhost);
311      self.state.full_summary.running_uploads.insert(
312        path_id,
313        crate::state::TransferInfo {
314          start: now,
315          host,
316          activity_id: 0,
317          bytes_transferred: 0,
318          total_bytes: None,
319        },
320      );
321      return Ok(true);
322    }
323
324    // "builder for '/nix/store/....drv' failed with exit code N"
325    if trimmed.starts_with("builder for '")
326      && trimmed.contains("failed with exit code")
327      && let Some(drv_path) = extract_path_from_message(trimmed)
328      && let Some(drv) = crate::state::Derivation::parse(&drv_path)
329    {
330      let exit_code = extract_exit_code(trimmed);
331      let fail_type =
332        exit_code.map_or(FailType::Unknown, FailType::BuildFailed);
333      let drv_id = self.state.get_or_create_derivation_id(drv);
334      let now = crate::state::current_time();
335      let build_info = self
336        .state
337        .get_derivation_info(drv_id)
338        .and_then(|info| {
339          if let crate::state::BuildStatus::Building(b) = &info.build_status {
340            Some(b.clone())
341          } else {
342            None
343          }
344        })
345        .unwrap_or(crate::state::BuildInfo {
346          start:       now,
347          host:        Host::Localhost,
348          estimate:    None,
349          activity_id: None,
350        });
351      self.state.update_build_status(
352        drv_id,
353        crate::state::BuildStatus::Failed {
354          info: build_info,
355          fail: crate::state::BuildFail { at: now, fail_type },
356        },
357      );
358      return Ok(true);
359    }
360
361    // "error: hash mismatch" lines
362    if trimmed.starts_with("error:") && trimmed.contains("hash mismatch") {
363      self.state.nix_errors.push(trimmed.to_string());
364      return Ok(true);
365    }
366
367    // General error lines
368    if trimmed.starts_with("error:") || trimmed.contains("error:") {
369      self.state.nix_errors.push(trimmed.to_string());
370
371      if let Some(drv_path) = extract_path_from_message(trimmed)
372        && let Some(drv) = crate::state::Derivation::parse(&drv_path)
373        && let Some(&drv_id) = self.state.derivation_ids.get(&drv)
374        && let Some(info) = self.state.get_derivation_info(drv_id)
375        && let crate::state::BuildStatus::Building(build_info) =
376          &info.build_status
377      {
378        let now = crate::state::current_time();
379        self.state.update_build_status(
380          drv_id,
381          crate::state::BuildStatus::Failed {
382            info: build_info.clone(),
383            fail: crate::state::BuildFail {
384              at:        now,
385              fail_type: FailType::Unknown,
386            },
387          },
388        );
389      }
390
391      return Ok(true);
392    }
393
394    // "checking outputs of '/nix/store/....drv'..."
395    if trimmed.contains("checking outputs of")
396      && let Some(drv_path) = extract_path_from_message(trimmed)
397      && let Some(drv) = crate::state::Derivation::parse(&drv_path)
398    {
399      let drv_id = self.state.get_or_create_derivation_id(drv);
400      self.state.touched_ids.insert(drv_id);
401      return Ok(true);
402    }
403
404    // Detect downloads (old-style)
405    if (trimmed.starts_with("downloading") || trimmed.contains("downloading '"))
406      && let Some(path_str) = extract_path_from_message(trimmed)
407      && let Some(path) = crate::state::StorePath::parse(&path_str)
408    {
409      let path_id = self.state.get_or_create_store_path_id(path);
410      let now = crate::state::current_time();
411      let total_bytes = extract_byte_size(trimmed);
412      self.state.full_summary.running_downloads.insert(
413        path_id,
414        crate::state::TransferInfo {
415          start: now,
416          host: Host::Localhost,
417          activity_id: 0,
418          bytes_transferred: 0,
419          total_bytes,
420        },
421      );
422      return Ok(true);
423    }
424
425    // Detect download completions
426    if (trimmed.starts_with("downloaded") || trimmed.contains("downloaded '"))
427      && let Some(path_str) = extract_path_from_message(trimmed)
428      && let Some(path) = StorePath::parse(&path_str)
429      && let Some(&path_id) = self.state.store_path_ids.get(&path)
430    {
431      let now = crate::state::current_time();
432      let total_bytes = extract_byte_size(trimmed).unwrap_or(0);
433      let start = self
434        .state
435        .full_summary
436        .running_downloads
437        .get(&path_id)
438        .map_or(now, |t| t.start);
439      let completed = crate::state::CompletedTransferInfo {
440        start,
441        end: now,
442        host: Host::Localhost,
443        total_bytes,
444      };
445      self.state.full_summary.running_downloads.remove(&path_id);
446      self
447        .state
448        .full_summary
449        .completed_downloads
450        .insert(path_id, completed);
451      return Ok(true);
452    }
453
454    // Detect build completions (old-style)
455    if (trimmed.starts_with("built") || trimmed.contains("built '"))
456      && let Some(drv_path) = extract_path_from_message(trimmed)
457      && let Some(drv) = Derivation::parse(&drv_path)
458      && let Some(&drv_id) = self.state.derivation_ids.get(&drv)
459      && let Some(info) = self.state.get_derivation_info(drv_id)
460      && let BuildStatus::Building(build_info) = &info.build_status
461    {
462      let now = crate::state::current_time();
463      self.state.update_build_status(drv_id, BuildStatus::Built {
464        info: build_info.clone(),
465        end:  now,
466      });
467      return Ok(true);
468    }
469
470    // Passthrough: unrecognized lines go to stdout
471    println!("{line}");
472    Ok(false)
473  }
474
475  /// Get a reference to the current state
476  pub const fn state(&self) -> &State {
477    &self.state
478  }
479
480  /// Get a mutable reference to the current state
481  pub const fn state_mut(&mut self) -> &mut State {
482    &mut self.state
483  }
484}
485
486/// Extract a remote host from "... on 'ssh://host'..." pattern
487fn extract_remote_host(line: &str) -> Option<Host> {
488  extract_remote_host_after(line, "on '")
489}
490
491/// Extract a remote host from a pattern like "from 'ssh://host'" or "to
492/// 'ssh://host'"
493fn extract_remote_host_after(line: &str, marker: &str) -> Option<Host> {
494  let pos = line.find(marker)?;
495  let after = &line[pos + marker.len()..];
496  let end = after.find('\'')?;
497  let raw = &after[..end];
498  let name = raw
499    .strip_prefix("ssh://")
500    .or_else(|| raw.strip_prefix("https://"))
501    .or_else(|| raw.strip_prefix("http://"))
502    .unwrap_or(raw)
503    .trim_end_matches('/');
504  if name.is_empty() || name == "localhost" {
505    Some(Host::Localhost)
506  } else {
507    Some(Host::Remote(name.to_string()))
508  }
509}
510
511fn extract_exit_code(line: &str) -> Option<i32> {
512  let pos = line.find("exit code")?;
513  let after = &line[pos + "exit code".len()..];
514  let trimmed = after.trim_start();
515  let code_str = trimmed.split(|c: char| !c.is_ascii_digit()).next()?;
516  code_str.parse().ok()
517}
518
519/// Extract a path from a message line
520pub fn extract_path_from_message(line: &str) -> Option<String> {
521  // Look for quoted paths
522  if let Some(start) = line.find('\'')
523    && let Some(end) = line[start + 1..].find('\'')
524  {
525    return Some(line[start + 1..start + 1 + end].to_string());
526  }
527
528  // Look for unquoted store paths
529  for word in line.split_whitespace() {
530    if word.starts_with("/nix/store/") {
531      return Some(
532        word
533          .trim_matches(|c: char| {
534            !c.is_ascii_alphanumeric() && c != '/' && c != '-' && c != '.'
535          })
536          .to_string(),
537      );
538    }
539  }
540
541  None
542}
543
544/// Extract byte size from a message line (e.g., "downloaded 123 KiB")
545pub fn extract_byte_size(line: &str) -> Option<u64> {
546  // Look for patterns like "123 KiB", "6.7 MiB", etc.
547  // Haha 6.7
548  let words: Vec<&str> = line.split_whitespace().collect();
549  for (i, word) in words.iter().enumerate() {
550    if i + 1 < words.len() {
551      let unit = words[i + 1];
552      if matches!(unit, "B" | "KiB" | "MiB" | "GiB" | "TiB" | "PiB")
553        && let Ok(value) = word.parse::<f64>()
554      {
555        let multiplier = match unit {
556          "B" => 1_u64,
557          "KiB" => 1024,
558          "MiB" => 1024 * 1024,
559          "GiB" => 1024 * 1024 * 1024,
560          "TiB" => 1024_u64 * 1024 * 1024 * 1024,
561          "PiB" => 1024_u64 * 1024 * 1024 * 1024 * 1024,
562          _ => 1,
563        };
564        return Some((value * multiplier as f64) as u64);
565      }
566    }
567  }
568  None
569}