1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
// Copyright (c) Facebook, Inc. and its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
#[derive(PartialEq)]
pub enum IterExecResult {
Success,
Skip,
}
/// Dumps (a portion of) the Model to some output in specific format.
pub trait Dumper {
fn dump_model(
&self,
ctx: &CommonFieldContext,
model: &model::Model,
output: &mut dyn Write,
round: &mut usize,
// comma_flag is for JSON output, if it set to true, we will have a "," before the output JSON.
// This is because in case of filter and delete empty val, we have no idea if the current
// value is the LAST value.
comma_flag: bool,
) -> Result<IterExecResult>;
}
/// Called by dump commands to dump Models in continuous time steps. The actual
/// dump logic for different Models in each time step is handled by specific
/// Dumper implementations. This function is responsible for retrieving Models
/// and handling formatting between time steps.
pub fn dump_timeseries(
mut advance: Advance,
time_begin: SystemTime,
time_end: SystemTime,
dumper: &dyn Dumper,
output: &mut dyn Write,
output_format: Option<OutputFormat>,
br: Option<String>,
errs: Receiver<Error>,
) -> Result<()> {
let mut model = match advance.jump_sample_to(time_begin) {
Some(m) => m,
None => bail!(
"No initial sample could be found!\n\
You may have provided a time in the future or no data was recorded during the provided time. \
Please check your input and timezone.\n\
If you are using remote, please make sure the below service on target host is running."
),
};
cliutil::check_initial_sample_time_in_time_range(model.timestamp, time_begin, time_end)?;
let json = output_format == Some(OutputFormat::Json);
let csv = output_format == Some(OutputFormat::Csv);
let openmetrics = output_format == Some(OutputFormat::OpenMetrics);
let mut round = 0;
if json {
write!(output, "[")?;
}
loop {
// Received external error, e.g. stop signal
if let Ok(e) = errs.try_recv() {
bail!(e);
}
let ctx = CommonFieldContext {
timestamp: model
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs() as i64,
hostname: model.system.hostname.clone(),
};
// Base on the exec result, we will determine if we need to generate the line breaker, etc
let comma_flag = round != 0;
let res = match dumper.dump_model(&ctx, &model, output, &mut round, comma_flag) {
Ok(res) => res,
Err(e) => {
// Swallow BrokenPipe error for write. Rust runtime will ignore SIGPIPE by default and
// propagating EPIPE upwards to the application in the form of an IoError::BrokenPipe.
if e.downcast_ref::<std::io::Error>()
.map_or(false, |e| e.kind() == std::io::ErrorKind::BrokenPipe)
{
return Ok(());
} else {
return Err(e);
}
}
};
if advance.get_next_ts() > time_end {
break;
}
model = match advance.advance(Direction::Forward) {
Some(m) => m,
None => break,
};
if res == IterExecResult::Skip {
continue;
}
if json {
write!(output, "\n")?;
} else if br.is_some() && !csv {
write!(output, "{}\n", br.as_ref().unwrap())?;
}
}
if json {
write!(output, "]")?;
} else if openmetrics {
writeln!(output, "# EOF")?;
}
cliutil::check_final_sample_time_with_requested_time(model.timestamp, time_end);
Ok(())
}