use std::fs::{remove_dir_all, File};
use std::io::BufReader;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use eyre::{eyre, Context, Result};
use itertools::Itertools;
use log::{debug, trace};
use crate::{
config::{Resolution, Sampling},
mar::{MarEntry, Metadata},
metrics::MetricReportType,
network::NetworkClient,
util::zip::{zip_stream_len_empty, zip_stream_len_for_file, ZipEncoder, ZipEntryInfo},
};
use super::Manifest;
pub fn collect_and_upload(
tmp_mar_staging: &Path,
persist_mar_staging: Option<PathBuf>,
client: &impl NetworkClient,
max_zip_size: usize,
sampling: Sampling,
data_retention_start: Option<DateTime<Utc>>,
) -> Result<usize> {
let mut tmp_entries = MarEntry::iterate_from_container(tmp_mar_staging)?
.filter(|entry_result| match entry_result {
Ok(entry) => should_upload(&entry.manifest, &sampling, data_retention_start),
_ => true,
});
match persist_mar_staging {
Some(persist_path) => {
let persist_entries = MarEntry::iterate_from_container(&persist_path)?
.filter(|entry_result| match entry_result {
Ok(entry) => should_upload(&entry.manifest, &sampling, data_retention_start),
_ => true,
});
let mut entries = persist_entries.chain(tmp_entries);
upload_mar_entries(&mut entries, client, max_zip_size, |included_entries| {
trace!("Uploaded {:?} - deleting...", included_entries);
included_entries.iter().for_each(|f| {
let _ = remove_dir_all(f);
})
})
}
None => upload_mar_entries(&mut tmp_entries, client, max_zip_size, |included_entries| {
trace!("Uploaded {:?} - deleting...", included_entries);
included_entries.iter().for_each(|f| {
let _ = remove_dir_all(f);
})
}),
}
}
fn should_upload(
manifest: &Manifest,
sampling: &Sampling,
data_retention_start: Option<DateTime<Utc>>,
) -> bool {
if matches!(manifest.metadata, Metadata::LinuxReboot { .. })
|| matches!(manifest.metadata, Metadata::DeviceConfig { .. })
{
return true;
}
if let Some(data_retention_start) = data_retention_start {
if manifest.collection_time.timestamp < data_retention_start {
return false;
}
}
match &manifest.metadata {
Metadata::DeviceAttributes { .. } => sampling.monitoring_resolution >= Resolution::Normal,
Metadata::DeviceConfig { .. } => true, Metadata::ElfCoredump { .. } => sampling.debugging_resolution >= Resolution::Normal,
Metadata::LinuxHeartbeat { .. } => sampling.monitoring_resolution >= Resolution::Normal,
Metadata::LinuxMetricReport { report_type, .. } => match report_type {
MetricReportType::Heartbeat => sampling.monitoring_resolution >= Resolution::Normal,
MetricReportType::Session(_) => sampling.monitoring_resolution >= Resolution::Normal,
MetricReportType::DailyHeartbeat => sampling.monitoring_resolution >= Resolution::Low,
},
Metadata::LinuxLogs { .. } => sampling.logging_resolution >= Resolution::Normal,
Metadata::LinuxReboot { .. } => true, Metadata::LinuxCustomTrace { .. } => sampling.debugging_resolution >= Resolution::Normal,
Metadata::CustomDataRecording { .. } => sampling.debugging_resolution >= Resolution::Normal,
Metadata::Stacktrace { .. } => sampling.debugging_resolution >= Resolution::Normal,
}
}
pub struct MarZipContents {
pub entry_paths: Vec<PathBuf>,
pub zip_infos: Vec<ZipEntryInfo>,
}
pub fn gather_mar_entries_to_zip(
entries: &mut impl Iterator<Item = Result<MarEntry>>,
max_zip_size: usize,
) -> Vec<MarZipContents> {
let entry_paths_with_zip_infos = entries.filter_map(|entry_result| match entry_result {
Ok(entry) => {
trace!("Adding {:?}", &entry.path);
let zip_infos: Option<Vec<ZipEntryInfo>> = (&entry)
.try_into()
.wrap_err_with(|| format!("Unable to add entry {}.", &entry.path.display()))
.ok();
let entry_and_infos: Option<(PathBuf, Vec<ZipEntryInfo>)> =
zip_infos.map(|infos| (entry.path, infos));
entry_and_infos
}
Err(e) => {
debug!("Invalid folder in MAR staging: {:?}", e);
None
}
});
let mut zip_size = zip_stream_len_empty();
let mut zip_file_index: usize = 0;
let grouper = entry_paths_with_zip_infos.group_by(|(_, zip_infos)| {
let entry_zipped_size = zip_infos.iter().map(zip_stream_len_for_file).sum::<usize>();
if zip_size + entry_zipped_size > max_zip_size {
zip_size = zip_stream_len_empty() + entry_zipped_size;
zip_file_index += 1;
} else {
zip_size += entry_zipped_size;
}
zip_file_index
});
grouper
.into_iter()
.map(|(_zip_file_index, group)| {
let (entry_paths, zip_infos): (Vec<PathBuf>, Vec<Vec<ZipEntryInfo>>) = group.unzip();
MarZipContents {
entry_paths,
zip_infos: zip_infos
.into_iter()
.flatten()
.collect::<Vec<ZipEntryInfo>>(),
}
})
.collect()
}
impl TryFrom<&MarEntry> for Vec<ZipEntryInfo> {
type Error = eyre::Error;
fn try_from(entry: &MarEntry) -> Result<Self> {
let entry_path = entry.path.clone();
entry
.filenames()
.map(move |filename| {
let path = entry_path.join(&filename);
let file =
File::open(&path).wrap_err_with(|| format!("Error opening {:?}", filename))?;
drop(file);
let base = entry_path.parent().ok_or(eyre!("No parent directory"))?;
ZipEntryInfo::new(path, base)
.wrap_err_with(|| format!("Error adding {:?}", filename))
})
.collect::<Result<Vec<_>>>()
}
}
fn upload_mar_entries(
entries: &mut impl Iterator<Item = Result<MarEntry>>,
client: &impl NetworkClient,
max_zip_size: usize,
callback: fn(entries: Vec<PathBuf>) -> (),
) -> Result<usize> {
let zip_files = gather_mar_entries_to_zip(entries, max_zip_size);
let count = zip_files.len();
for MarZipContents {
entry_paths,
zip_infos,
} in zip_files.into_iter()
{
client.upload_mar_file(BufReader::new(ZipEncoder::new(zip_infos)))?;
callback(entry_paths);
}
Ok(count)
}
#[cfg(test)]
mod tests {
use chrono::DateTime;
use rstest::{fixture, rstest};
use std::str::FromStr;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
use crate::reboot::{RebootReason, RebootReasonCode};
use crate::{
mar::test_utils::{assert_mar_content_matches, MarCollectorFixture},
metrics::SessionName,
network::MockNetworkClient,
};
use crate::{
metrics::{MetricStringKey, MetricValue},
test_utils::setup_logger,
};
use super::*;
#[rstest]
fn collecting_from_empty_folder(_setup_logger: (), mar_fixture: MarCollectorFixture) {
assert_eq!(
MarEntry::iterate_from_container(&mar_fixture.tmp_mar_staging)
.unwrap()
.count(),
0
)
}
#[rstest]
fn collecting_from_folder_with_partial_entries(
_setup_logger: (),
mut mar_fixture: MarCollectorFixture,
) {
mar_fixture.create_empty_entry(false);
mar_fixture.create_logentry(false);
assert_eq!(
MarEntry::iterate_from_container(&mar_fixture.tmp_mar_staging)
.unwrap()
.filter(|e| e.is_ok())
.count(),
1
)
}
#[rstest]
fn zipping_two_entries(_setup_logger: (), mut mar_fixture: MarCollectorFixture) {
mar_fixture.create_logentry(false);
mar_fixture.create_logentry(false);
let mut entries = MarEntry::iterate_from_container(&mar_fixture.tmp_mar_staging)
.expect("We should still be able to collect.");
let mars = gather_mar_entries_to_zip(&mut entries, usize::MAX);
assert_eq!(mars.len(), 1);
assert_eq!(mars[0].entry_paths.len(), 2);
assert_eq!(mars[0].zip_infos.len(), 4); }
#[rstest]
#[case::not_json(MarCollectorFixture::create_entry_with_bogus_json)]
#[case::unreadable_dir(MarCollectorFixture::create_entry_without_directory_read_permission)]
#[case::unreadable_manifest(MarCollectorFixture::create_entry_without_manifest_read_permission)]
fn zipping_with_skipped_entries(
_setup_logger: (),
mut mar_fixture: MarCollectorFixture,
#[case] create_bogus_entry: fn(&mut MarCollectorFixture, bool) -> PathBuf,
) {
create_bogus_entry(&mut mar_fixture, false);
mar_fixture.create_logentry(false);
let mut entries = MarEntry::iterate_from_container(&mar_fixture.tmp_mar_staging)
.expect("We should still be able to collect.");
let mars = gather_mar_entries_to_zip(&mut entries, usize::MAX);
assert_eq!(mars.len(), 1);
assert_eq!(mars[0].entry_paths.len(), 1);
assert_eq!(mars[0].zip_infos.len(), 2); }
#[rstest]
fn zipping_an_unreadable_attachment(_setup_logger: (), mut mar_fixture: MarCollectorFixture) {
mar_fixture.create_logentry_with_unreadable_attachment(false);
let mut entries = MarEntry::iterate_from_container(&mar_fixture.tmp_mar_staging)
.expect("We should still be able to collect.");
let mars = gather_mar_entries_to_zip(&mut entries, usize::MAX);
assert_eq!(mars.len(), 0);
}
#[rstest]
fn new_mar_when_size_limit_is_reached(_setup_logger: (), mut mar_fixture: MarCollectorFixture) {
let max_zip_size = 1024;
mar_fixture.create_logentry_with_size(max_zip_size / 2, false);
mar_fixture.create_logentry_with_size(max_zip_size, false);
mar_fixture.create_logentry_with_size(max_zip_size * 2, false);
let mut entries = MarEntry::iterate_from_container(&mar_fixture.tmp_mar_staging)
.expect("We should still be able to collect.");
let mars = gather_mar_entries_to_zip(&mut entries, max_zip_size as usize);
assert_eq!(mars.len(), 3);
for contents in mars {
assert_eq!(contents.entry_paths.len(), 1);
assert_eq!(contents.zip_infos.len(), 2); }
}
#[rstest]
fn uploading_empty_list(
_setup_logger: (),
client: MockNetworkClient,
mar_fixture: MarCollectorFixture,
) {
collect_and_upload(
&mar_fixture.tmp_mar_staging,
None,
&client,
usize::MAX,
Sampling {
debugging_resolution: Resolution::Normal,
logging_resolution: Resolution::Normal,
monitoring_resolution: Resolution::Normal,
},
None,
)
.unwrap();
}
#[rstest]
#[case::off(Resolution::Off, false)]
#[case::low(Resolution::Low, false)]
#[case::normal(Resolution::Normal, true)]
#[case::high(Resolution::High, true)]
fn uploading_logs(
#[case] resolution: Resolution,
#[case] should_upload: bool,
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
mar_fixture.create_logentry(false);
let expected_files =
should_upload.then(|| vec!["<entry>/manifest.json", "<entry>/system.log"]);
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: resolution,
monitoring_resolution: Resolution::Off,
};
upload_and_verify(mar_fixture, client, sampling_config, expected_files, None);
}
#[rstest]
#[case::off(Resolution::Off, false)]
#[case::low(Resolution::Low, false)]
#[case::normal(Resolution::Normal, true)]
#[case::high(Resolution::High, true)]
fn uploading_device_attributes(
#[case] resolution: Resolution,
#[case] should_upload: bool,
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
mar_fixture.create_device_attributes_entry(vec![], SystemTime::now(), false);
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: Resolution::Off,
monitoring_resolution: resolution,
};
let expected_files = should_upload.then(|| vec!["<entry>/manifest.json"]);
upload_and_verify(mar_fixture, client, sampling_config, expected_files, None);
}
#[rstest]
#[case::off(Resolution::Off, true)]
#[case::low(Resolution::Low, true)]
#[case::normal(Resolution::Normal, true)]
#[case::high(Resolution::High, true)]
fn uploading_reboots(
#[case] resolution: Resolution,
#[case] should_upload: bool,
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
mar_fixture.create_reboot_entry(RebootReason::Code(RebootReasonCode::Unknown), false);
let sampling_config = Sampling {
debugging_resolution: resolution,
logging_resolution: Resolution::Off,
monitoring_resolution: Resolution::Off,
};
let expected_files = should_upload.then(|| vec!["<entry>/manifest.json"]);
upload_and_verify(mar_fixture, client, sampling_config, expected_files, None);
}
#[rstest]
#[case::off(Resolution::Off, false)]
#[case::low(Resolution::Low, false)]
#[case::normal(Resolution::Normal, true)]
#[case::high(Resolution::High, true)]
fn uploading_custom_data_recordings(
#[case] resolution: Resolution,
#[case] should_upload: bool,
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
let data = vec![1, 3, 3, 7];
mar_fixture.create_custom_data_recording_entry(data, false);
let sampling_config = Sampling {
debugging_resolution: resolution,
logging_resolution: Resolution::Off,
monitoring_resolution: Resolution::Off,
};
let expected_files = should_upload.then(|| vec!["<entry>/data", "<entry>/manifest.json"]);
upload_and_verify(mar_fixture, client, sampling_config, expected_files, None);
}
#[rstest]
#[case::heartbeat_off(MetricReportType::Heartbeat, Resolution::Off, false)]
#[case::heartbeat_low(MetricReportType::Heartbeat, Resolution::Low, false)]
#[case::heartbeat_normal(MetricReportType::Heartbeat, Resolution::Normal, true)]
#[case::heartbeat_high(MetricReportType::Heartbeat, Resolution::High, true)]
#[case::daily_heartbeat_off(MetricReportType::DailyHeartbeat, Resolution::Off, false)]
#[case::daily_heartbeat_low(MetricReportType::DailyHeartbeat, Resolution::Low, true)]
#[case::daily_heartbeat_normal(MetricReportType::DailyHeartbeat, Resolution::Normal, true)]
#[case::daily_heartbeat_high(MetricReportType::DailyHeartbeat, Resolution::High, true)]
#[case::session_off(
MetricReportType::Session(SessionName::from_str("test").unwrap()),
Resolution::Off,
false
)]
#[case::session_low(
MetricReportType::Session(SessionName::from_str("test").unwrap()),
Resolution::Low,
false
)]
#[case::session_normal(
MetricReportType::Session(SessionName::from_str("test").unwrap()),
Resolution::Normal,
true
)]
#[case::session_high(
MetricReportType::Session(SessionName::from_str("test").unwrap()),
Resolution::High,
true
)]
fn uploading_metric_reports(
#[case] report_type: MetricReportType,
#[case] resolution: Resolution,
#[case] should_upload: bool,
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
let duration = Duration::from_secs(1);
let boottime_duration = Some(Duration::from_secs(1));
let metrics: HashMap<MetricStringKey, MetricValue> = vec![(
MetricStringKey::from_str("foo").unwrap(),
MetricValue::Number(1.0),
)]
.into_iter()
.collect();
mar_fixture.create_metric_report_entry(
metrics,
duration,
boottime_duration,
report_type,
false,
);
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: Resolution::Off,
monitoring_resolution: resolution,
};
let expected_files = should_upload.then(|| vec!["<entry>/manifest.json"]);
upload_and_verify(mar_fixture, client, sampling_config, expected_files, None);
}
#[rstest]
#[case(Duration::from_secs(1), false)]
#[case(Duration::from_secs(0), true)]
fn test_upload_data_retention_time(
#[case] duration_since_yesterday: Duration,
#[case] should_upload: bool,
_setup_logger: (),
client: MockNetworkClient,
) {
let mut mar_fixture = MarCollectorFixture::new();
let now = SystemTime::now();
let yesterday = now - Duration::from_secs(24 * 60 * 60);
mar_fixture.create_logentry_with_size_and_age(
1024,
yesterday - duration_since_yesterday,
false,
);
let data_retention_start = DateTime::from(yesterday);
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: Resolution::Normal,
monitoring_resolution: Resolution::Off,
};
let expected_files =
should_upload.then(|| vec!["<entry>/manifest.json", "<entry>/system.log"]);
upload_and_verify(
mar_fixture,
client,
sampling_config,
expected_files,
Some(data_retention_start),
);
}
#[rstest]
fn test_upload_data_start_time_reboot(
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
let now = SystemTime::now();
let yesterday = now - Duration::from_secs(24 * 60 * 60);
mar_fixture.create_reboot_entry(RebootReason::Code(RebootReasonCode::Unknown), false);
mar_fixture.create_logentry_with_size_and_age(
1024,
yesterday - Duration::from_secs(1),
false,
);
let data_retention_start = DateTime::from(yesterday);
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: Resolution::Normal,
monitoring_resolution: Resolution::Off,
};
let expected_files = vec!["<entry>/manifest.json"];
upload_and_verify(
mar_fixture,
client,
sampling_config,
Some(expected_files),
Some(data_retention_start),
);
}
#[rstest]
fn test_upload_data_start_time_device_config(
_setup_logger: (),
client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
let now = SystemTime::now();
let yesterday = now - Duration::from_secs(24 * 60 * 60);
mar_fixture.create_device_config_entry(false);
let data_retention_start = DateTime::from(yesterday);
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: Resolution::Normal,
monitoring_resolution: Resolution::Off,
};
let expected_files = vec!["<entry>/manifest.json"];
upload_and_verify(
mar_fixture,
client,
sampling_config,
Some(expected_files),
Some(data_retention_start),
);
}
#[rstest]
fn uploading_from_both_tmp_and_persist_dirs(
_setup_logger: (),
mut client: MockNetworkClient,
mut mar_fixture: MarCollectorFixture,
) {
mar_fixture.create_logentry(true);
mar_fixture.create_logentry(false);
client
.expect_upload_mar_file::<BufReader<ZipEncoder>>()
.withf(move |buf_reader| {
let zip_encoder = buf_reader.get_ref();
let file_names = zip_encoder.file_names();
assert_eq!(file_names.len(), 4);
let manifest_count = file_names
.iter()
.filter(|name| name.ends_with("manifest.json"))
.count();
let log_count = file_names
.iter()
.filter(|name| name.ends_with("system.log"))
.count();
assert_eq!(manifest_count, 2);
assert_eq!(log_count, 2);
true
})
.once()
.returning(|_| Ok(()));
let sampling_config = Sampling {
debugging_resolution: Resolution::Off,
logging_resolution: Resolution::Normal,
monitoring_resolution: Resolution::Off,
};
collect_and_upload(
&mar_fixture.tmp_mar_staging,
Some(mar_fixture.persist_mar_staging),
&client,
usize::MAX,
sampling_config,
None,
)
.unwrap();
}
fn upload_and_verify(
mar_fixture: MarCollectorFixture,
mut client: MockNetworkClient,
sampling_config: Sampling,
expected_files: Option<Vec<&'static str>>,
data_retention_start: Option<DateTime<Utc>>,
) {
if let Some(expected_files) = expected_files {
client
.expect_upload_mar_file::<BufReader<ZipEncoder>>()
.withf(move |buf_reader| {
let zip_encoder = buf_reader.get_ref();
assert_mar_content_matches(zip_encoder, expected_files.clone())
})
.once()
.returning(|_| Ok(()));
}
collect_and_upload(
&mar_fixture.tmp_mar_staging,
None,
&client,
usize::MAX,
sampling_config,
data_retention_start,
)
.unwrap();
}
#[fixture]
fn client() -> MockNetworkClient {
MockNetworkClient::default()
}
#[fixture]
fn mar_fixture() -> MarCollectorFixture {
MarCollectorFixture::new()
}
}