below_dump/
tmain.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17#[derive(PartialEq)]
18pub enum IterExecResult {
19    Success,
20    Skip,
21}
22
23/// Dumps (a portion of) the Model to some output in specific format.
24pub trait Dumper {
25    fn dump_model(
26        &self,
27        ctx: &CommonFieldContext,
28        model: &model::Model,
29        output: &mut dyn Write,
30        round: &mut usize,
31        //  comma_flag is for JSON output, if it set to true, we will have a "," before the output JSON.
32        // This is because in case of filter and delete empty val, we have no idea if the current
33        // value is the LAST value.
34        comma_flag: bool,
35    ) -> Result<IterExecResult>;
36}
37
38/// Called by dump commands to dump Models in continuous time steps. The actual
39/// dump logic for different Models in each time step is handled by specific
40/// Dumper implementations. This function is responsible for retrieving Models
41/// and handling formatting between time steps.
42pub fn dump_timeseries(
43    mut advance: Advance,
44    time_begin: SystemTime,
45    time_end: SystemTime,
46    dumper: &dyn Dumper,
47    output: &mut dyn Write,
48    output_format: Option<OutputFormat>,
49    br: Option<String>,
50    errs: Receiver<Error>,
51) -> Result<()> {
52    let mut model = match advance.jump_sample_to(time_begin) {
53        Some(m) => m,
54        None => bail!(
55            "No initial sample could be found!\n\
56            You may have provided a time in the future or no data was recorded during the provided time. \
57            Please check your input and timezone.\n\
58            If you are using remote, please make sure the below service on target host is running."
59        ),
60    };
61
62    cliutil::check_initial_sample_time_in_time_range(model.timestamp, time_begin, time_end)?;
63
64    let json = output_format == Some(OutputFormat::Json);
65    let csv = output_format == Some(OutputFormat::Csv);
66    let openmetrics = output_format == Some(OutputFormat::OpenMetrics);
67
68    let mut round = 0;
69
70    if json {
71        write!(output, "[")?;
72    }
73
74    loop {
75        // Received external error, e.g. stop signal
76        if let Ok(e) = errs.try_recv() {
77            bail!(e);
78        }
79        let ctx = CommonFieldContext {
80            timestamp: model
81                .timestamp
82                .duration_since(SystemTime::UNIX_EPOCH)?
83                .as_secs() as i64,
84            hostname: model.system.hostname.clone(),
85        };
86        // Base on the exec result, we will determine if we need to generate the line breaker, etc
87        let comma_flag = round != 0;
88        let res = match dumper.dump_model(&ctx, &model, output, &mut round, comma_flag) {
89            Ok(res) => res,
90            Err(e) => {
91                // Swallow BrokenPipe error for write. Rust runtime will ignore SIGPIPE by default and
92                // propagating EPIPE upwards to the application in the form of an IoError::BrokenPipe.
93                if e.downcast_ref::<std::io::Error>()
94                    .map_or(false, |e| e.kind() == std::io::ErrorKind::BrokenPipe)
95                {
96                    return Ok(());
97                } else {
98                    return Err(e);
99                }
100            }
101        };
102
103        if advance.get_next_ts() > time_end {
104            break;
105        }
106
107        model = match advance.advance(Direction::Forward) {
108            Some(m) => m,
109            None => break,
110        };
111
112        if res == IterExecResult::Skip {
113            continue;
114        }
115
116        if json {
117            writeln!(output)?;
118        } else if br.is_some() && !csv {
119            writeln!(output, "{}", br.as_ref().unwrap())?;
120        }
121    }
122
123    if json {
124        write!(output, "]")?;
125    } else if openmetrics {
126        writeln!(output, "# EOF")?;
127    }
128
129    cliutil::check_final_sample_time_with_requested_time(model.timestamp, time_end);
130
131    Ok(())
132}