use std::{
io::{BufRead, Write},
time::Duration,
};
use cognos::Host;
use tracing::debug;
use crate::{
cache::BuildReportCache,
display::{Display, DisplayConfig},
error::{Result, RomError},
state::{BuildStatus, Derivation, FailType, State, StorePath},
types::{Config, InputMode},
update,
};
enum HumanParserState {
Idle,
PlanBuilds,
PlanDownloads,
}
struct HumanParser {
state: HumanParserState,
pending: Vec<String>,
}
impl HumanParser {
fn new() -> Self {
Self {
state: HumanParserState::Idle,
pending: Vec::new(),
}
}
}
pub struct Monitor<W: Write> {
state: State,
display: Display<W>,
config: Config,
human_parser: HumanParser,
}
impl<W: Write> Monitor<W> {
pub fn new(config: Config, writer: W) -> Result<Self> {
let display_config = DisplayConfig {
show_timers: config.show_timers,
max_tree_depth: 10,
max_visible_lines: 100,
use_color: !config.piping,
format: config.format,
legend_style: config.legend_style,
summary_style: config.summary_style,
icons: crate::icons::detect(),
};
let display = Display::new(writer, display_config)?;
let mut state = State::new();
let cache_path = BuildReportCache::default_cache_path();
let cache = BuildReportCache::new(cache_path);
state.build_cache = cache.load();
Ok(Self {
state,
display,
config,
human_parser: HumanParser::new(),
})
}
pub fn process_stream<R: BufRead>(&mut self, reader: R) -> Result<()> {
let mut last_render = std::time::Instant::now();
let render_interval = Duration::from_millis(100);
let mut last_poll = std::time::Instant::now();
let poll_interval = Duration::from_millis(200);
for line in reader.lines() {
let line = line.map_err(RomError::Io)?;
self.process_line(&line)?;
if last_poll.elapsed() >= poll_interval {
let now = crate::state::current_time();
crate::update::detect_local_completed_builds(&mut self.state, now);
last_poll = std::time::Instant::now();
}
if last_render.elapsed() >= render_interval {
self.display.render(&self.state, &[])?;
last_render = std::time::Instant::now();
}
}
crate::update::finish_state(&mut self.state);
self.display.render_final(&self.state)?;
let cache_path = BuildReportCache::default_cache_path();
let cache = BuildReportCache::new(cache_path);
if let Err(e) = cache.save(&self.state.build_cache) {
debug!("Failed to save build cache: {}", e);
}
if self.state.has_errors() {
return Err(RomError::BuildFailed);
}
Ok(())
}
fn process_line(&mut self, line: &str) -> Result<bool> {
if line.starts_with("@nix ") {
self.process_json_line(line)
} else {
match self.config.input_mode {
InputMode::Json => self.process_json_line(line),
InputMode::Human => self.process_human_line(line),
}
}
}
fn process_json_line(&mut self, line: &str) -> Result<bool> {
if let Some(json_str) = line.strip_prefix("@nix ") {
match serde_json::from_str::<cognos::Actions>(json_str) {
Ok(action) => {
if let cognos::Actions::Message { msg, .. } = &action {
println!("{msg}");
}
let changed = update::process_message(&mut self.state, action);
Ok(changed)
},
Err(e) => {
tracing::debug!("Failed to parse JSON message: {}", e);
Ok(false)
},
}
} else {
println!("{line}");
Ok(false)
}
}
fn process_human_line(&mut self, line: &str) -> Result<bool> {
match self.human_parser.state {
HumanParserState::PlanBuilds | HumanParserState::PlanDownloads => {
if line.starts_with(" /nix/store/")
|| line.starts_with("\t/nix/store/")
{
let path = line.trim().to_string();
self.human_parser.pending.push(path);
return Ok(true);
} else {
let pending = std::mem::take(&mut self.human_parser.pending);
let is_builds =
matches!(self.human_parser.state, HumanParserState::PlanBuilds);
self.human_parser.state = HumanParserState::Idle;
for path_str in pending {
if is_builds {
if let Some(drv) = crate::state::Derivation::parse(&path_str) {
let drv_id = self.state.get_or_create_derivation_id(drv);
self.state.update_build_status(
drv_id,
crate::state::BuildStatus::Planned,
);
if !self.state.forest_roots.contains(&drv_id) {
self.state.forest_roots.push(drv_id);
}
}
} else if let Some(sp) = crate::state::StorePath::parse(&path_str) {
let sp_id = self.state.get_or_create_store_path_id(sp);
self.state.full_summary.planned_downloads.insert(sp_id);
}
}
}
},
HumanParserState::Idle => {},
}
let trimmed = line.trim();
if trimmed.is_empty() {
return Ok(false);
}
if trimmed.ends_with("derivations will be built:")
|| trimmed == "this derivation will be built:"
|| trimmed.ends_with("derivation will be built:")
{
self.human_parser.state = HumanParserState::PlanBuilds;
self.human_parser.pending.clear();
return Ok(true);
}
if trimmed.contains("paths will be fetched")
|| trimmed.contains("path will be fetched")
{
self.human_parser.state = HumanParserState::PlanDownloads;
self.human_parser.pending.clear();
return Ok(true);
}
if trimmed.starts_with("building '")
&& trimmed.contains(".drv'")
&& trimmed.contains(" on '")
&& let Some(drv_path) = extract_path_from_message(trimmed)
&& let Some(drv) = crate::state::Derivation::parse(&drv_path)
{
let host = extract_remote_host(trimmed).unwrap_or(Host::Localhost);
let drv_id = self.state.get_or_create_derivation_id(drv);
let now = crate::state::current_time();
self.state.update_build_status(
drv_id,
crate::state::BuildStatus::Building(crate::state::BuildInfo {
start: now,
host,
estimate: None,
activity_id: None,
}),
);
return Ok(true);
}
if (trimmed.starts_with("building") || trimmed.contains("building '"))
&& let Some(drv_path) = extract_path_from_message(trimmed)
&& let Some(drv) = crate::state::Derivation::parse(&drv_path)
{
let drv_id = self.state.get_or_create_derivation_id(drv);
let now = crate::state::current_time();
self.state.update_build_status(
drv_id,
crate::state::BuildStatus::Building(crate::state::BuildInfo {
start: now,
host: Host::Localhost,
estimate: None,
activity_id: None,
}),
);
return Ok(true);
}
if trimmed.starts_with("copying path '")
&& trimmed.contains("' from '")
&& let Some(path_str) = extract_path_from_message(trimmed)
&& let Some(path) = crate::state::StorePath::parse(&path_str)
{
let path_id = self.state.get_or_create_store_path_id(path);
let now = crate::state::current_time();
let host =
extract_remote_host_after(trimmed, "from '").unwrap_or(Host::Localhost);
self.state.full_summary.running_downloads.insert(
path_id,
crate::state::TransferInfo {
start: now,
host,
activity_id: 0,
bytes_transferred: 0,
total_bytes: None,
},
);
return Ok(true);
}
if trimmed.starts_with("copying path '")
&& trimmed.contains("' to '")
&& let Some(path_str) = extract_path_from_message(trimmed)
&& let Some(path) = crate::state::StorePath::parse(&path_str)
{
let path_id = self.state.get_or_create_store_path_id(path);
let now = crate::state::current_time();
let host =
extract_remote_host_after(trimmed, "to '").unwrap_or(Host::Localhost);
self.state.full_summary.running_uploads.insert(
path_id,
crate::state::TransferInfo {
start: now,
host,
activity_id: 0,
bytes_transferred: 0,
total_bytes: None,
},
);
return Ok(true);
}
if trimmed.starts_with("builder for '")
&& trimmed.contains("failed with exit code")
&& let Some(drv_path) = extract_path_from_message(trimmed)
&& let Some(drv) = crate::state::Derivation::parse(&drv_path)
{
let exit_code = extract_exit_code(trimmed);
let fail_type =
exit_code.map_or(FailType::Unknown, FailType::BuildFailed);
let drv_id = self.state.get_or_create_derivation_id(drv);
let now = crate::state::current_time();
let build_info = self
.state
.get_derivation_info(drv_id)
.and_then(|info| {
if let crate::state::BuildStatus::Building(b) = &info.build_status {
Some(b.clone())
} else {
None
}
})
.unwrap_or(crate::state::BuildInfo {
start: now,
host: Host::Localhost,
estimate: None,
activity_id: None,
});
self.state.update_build_status(
drv_id,
crate::state::BuildStatus::Failed {
info: build_info,
fail: crate::state::BuildFail { at: now, fail_type },
},
);
return Ok(true);
}
if trimmed.starts_with("error:") && trimmed.contains("hash mismatch") {
self.state.nix_errors.push(trimmed.to_string());
return Ok(true);
}
if trimmed.starts_with("error:") || trimmed.contains("error:") {
self.state.nix_errors.push(trimmed.to_string());
if let Some(drv_path) = extract_path_from_message(trimmed)
&& let Some(drv) = crate::state::Derivation::parse(&drv_path)
&& let Some(&drv_id) = self.state.derivation_ids.get(&drv)
&& let Some(info) = self.state.get_derivation_info(drv_id)
&& let crate::state::BuildStatus::Building(build_info) =
&info.build_status
{
let now = crate::state::current_time();
self.state.update_build_status(
drv_id,
crate::state::BuildStatus::Failed {
info: build_info.clone(),
fail: crate::state::BuildFail {
at: now,
fail_type: FailType::Unknown,
},
},
);
}
return Ok(true);
}
if trimmed.contains("checking outputs of")
&& let Some(drv_path) = extract_path_from_message(trimmed)
&& let Some(drv) = crate::state::Derivation::parse(&drv_path)
{
let drv_id = self.state.get_or_create_derivation_id(drv);
self.state.touched_ids.insert(drv_id);
return Ok(true);
}
if (trimmed.starts_with("downloading") || trimmed.contains("downloading '"))
&& let Some(path_str) = extract_path_from_message(trimmed)
&& let Some(path) = crate::state::StorePath::parse(&path_str)
{
let path_id = self.state.get_or_create_store_path_id(path);
let now = crate::state::current_time();
let total_bytes = extract_byte_size(trimmed);
self.state.full_summary.running_downloads.insert(
path_id,
crate::state::TransferInfo {
start: now,
host: Host::Localhost,
activity_id: 0,
bytes_transferred: 0,
total_bytes,
},
);
return Ok(true);
}
if (trimmed.starts_with("downloaded") || trimmed.contains("downloaded '"))
&& let Some(path_str) = extract_path_from_message(trimmed)
&& let Some(path) = StorePath::parse(&path_str)
&& let Some(&path_id) = self.state.store_path_ids.get(&path)
{
let now = crate::state::current_time();
let total_bytes = extract_byte_size(trimmed).unwrap_or(0);
let start = self
.state
.full_summary
.running_downloads
.get(&path_id)
.map_or(now, |t| t.start);
let completed = crate::state::CompletedTransferInfo {
start,
end: now,
host: Host::Localhost,
total_bytes,
};
self.state.full_summary.running_downloads.remove(&path_id);
self
.state
.full_summary
.completed_downloads
.insert(path_id, completed);
return Ok(true);
}
if (trimmed.starts_with("built") || trimmed.contains("built '"))
&& let Some(drv_path) = extract_path_from_message(trimmed)
&& let Some(drv) = Derivation::parse(&drv_path)
&& let Some(&drv_id) = self.state.derivation_ids.get(&drv)
&& let Some(info) = self.state.get_derivation_info(drv_id)
&& let BuildStatus::Building(build_info) = &info.build_status
{
let now = crate::state::current_time();
self.state.update_build_status(drv_id, BuildStatus::Built {
info: build_info.clone(),
end: now,
});
return Ok(true);
}
println!("{line}");
Ok(false)
}
pub const fn state(&self) -> &State {
&self.state
}
pub const fn state_mut(&mut self) -> &mut State {
&mut self.state
}
}
fn extract_remote_host(line: &str) -> Option<Host> {
extract_remote_host_after(line, "on '")
}
fn extract_remote_host_after(line: &str, marker: &str) -> Option<Host> {
let pos = line.find(marker)?;
let after = &line[pos + marker.len()..];
let end = after.find('\'')?;
let raw = &after[..end];
let name = raw
.strip_prefix("ssh://")
.or_else(|| raw.strip_prefix("https://"))
.or_else(|| raw.strip_prefix("http://"))
.unwrap_or(raw)
.trim_end_matches('/');
if name.is_empty() || name == "localhost" {
Some(Host::Localhost)
} else {
Some(Host::Remote(name.to_string()))
}
}
fn extract_exit_code(line: &str) -> Option<i32> {
let pos = line.find("exit code")?;
let after = &line[pos + "exit code".len()..];
let trimmed = after.trim_start();
let code_str = trimmed.split(|c: char| !c.is_ascii_digit()).next()?;
code_str.parse().ok()
}
pub fn extract_path_from_message(line: &str) -> Option<String> {
if let Some(start) = line.find('\'')
&& let Some(end) = line[start + 1..].find('\'')
{
return Some(line[start + 1..start + 1 + end].to_string());
}
for word in line.split_whitespace() {
if word.starts_with("/nix/store/") {
return Some(
word
.trim_matches(|c: char| {
!c.is_ascii_alphanumeric() && c != '/' && c != '-' && c != '.'
})
.to_string(),
);
}
}
None
}
pub fn extract_byte_size(line: &str) -> Option<u64> {
let words: Vec<&str> = line.split_whitespace().collect();
for (i, word) in words.iter().enumerate() {
if i + 1 < words.len() {
let unit = words[i + 1];
if matches!(unit, "B" | "KiB" | "MiB" | "GiB" | "TiB" | "PiB")
&& let Ok(value) = word.parse::<f64>()
{
let multiplier = match unit {
"B" => 1_u64,
"KiB" => 1024,
"MiB" => 1024 * 1024,
"GiB" => 1024 * 1024 * 1024,
"TiB" => 1024_u64 * 1024 * 1024 * 1024,
"PiB" => 1024_u64 * 1024 * 1024 * 1024 * 1024,
_ => 1,
};
return Some((value * multiplier as f64) as u64);
}
}
}
None
}