use std::{io, fs, thread};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use arc_swap::ArcSwap;
use daemonbase::config::ConfigPath;
use log::{debug, error};
use rpki::slurm::{SlurmFile, ValidationOutputFilters};
use serde::Deserialize;
use tokio::sync::Notify;
use crate::payload;
use crate::comms::{Gate, Link, Terminated, UnitUpdate};
use crate::manager::Component;
const UPDATE_SLEEP: Duration = Duration::from_secs(2);
#[derive(Debug, Deserialize)]
pub struct LocalExceptions {
source: Link,
files: Vec<ConfigPath>,
}
impl LocalExceptions {
pub async fn run(
mut self, mut component: Component, mut gate: Gate
) -> Result<(), Terminated> {
component.register_metrics(gate.metrics());
let files = ExceptionSet::new(
self.files.into_iter().map(Into::into).collect()
);
let mut ready = false;
loop {
tokio::select! {
biased;
maybe_update = self.source.query() => {
match maybe_update {
UnitUpdate::Payload(_update) => { }
UnitUpdate::Gone => {
gate.update(UnitUpdate::Gone).await;
return Ok(())
}
_ => continue,
}
}
_ = files.notified() => {
ready = true;
}
_ = gate.process() => {
continue
}
}
if let (true, Some(data)) = (ready, self.source.payload()) {
gate.update(
UnitUpdate::Payload(files.apply(component.name(), data))
).await;
}
}
}
}
struct ExceptionSet {
data: Arc<ExceptionSetData>,
alive: Arc<()>,
}
impl ExceptionSet {
fn new(paths: Vec<PathBuf>) -> Self {
let res = ExceptionSet {
data: Arc::new(
ExceptionSetData {
files: paths.iter().map(|_| Default::default()).collect(),
paths,
notify: Notify::new(),
}
),
alive: Arc::new(()),
};
let data = res.data.clone();
let alive = Arc::downgrade(&res.alive);
thread::spawn(move || {
data.update_thread(alive)
});
res
}
fn apply(&self, unit: &str, update: &payload::Update) -> payload::Update {
let mut set = update.set().clone();
for (path, file) in
self.data.paths.iter().zip(self.data.files.iter())
{
set = file.load().apply(unit, path, set);
}
payload::Update::new(set)
}
async fn notified(&self) {
self.data.notify.notified().await
}
}
struct ExceptionSetData {
paths: Vec<PathBuf>,
files: Vec<ArcSwap<Content>>,
notify: Notify,
}
impl ExceptionSetData {
fn update_thread(self: Arc<Self>, alive: Weak<()>) {
let mut modified = vec![None::<SystemTime>; self.paths.len()];
loop {
if alive.upgrade().is_none() {
return
}
let mut updated = false;
for (path, (modified, content)) in
self.paths.iter().zip(
modified.iter_mut().zip(self.files.iter())
)
{
match Self::update_file(path, modified, content) {
Ok(true) => updated = true,
Ok(false) => { }
Err(err) => {
error!(
"Failed to read SLURM file {}: {}",
path.display(), err
);
}
}
}
if updated {
self.notify.notify_one();
}
thread::sleep(UPDATE_SLEEP);
}
}
fn update_file(
path: &Path,
old_modified: &mut Option<SystemTime>,
content: &ArcSwap<Content>
) -> Result<bool, io::Error> {
let new_modified = fs::metadata(path)?.modified()?;
if let Some(old_modified) = old_modified.as_ref() {
if new_modified <= *old_modified {
return Ok(false)
}
}
let slurm = fs::File::open(path).and_then(|file| {
SlurmFile::from_reader(
io::BufReader::new(file)
).map_err(Into::into)
});
*old_modified = Some(new_modified);
let slurm = slurm?;
content.store(Arc::new(slurm.into()));
debug!("Updated Slurm file {}", path.display());
Ok(true)
}
}
#[derive(Default)]
struct Content {
filters: ValidationOutputFilters,
assertions: payload::Pack,
}
impl Content {
fn apply(
&self, unit: &str, path: &Path, set: payload::Set
) -> payload::Set {
let filtered = set.filter(|payload| {
!self.filters.drop_payload(payload)
});
let filtered_len = filtered.len();
let mut builder = filtered.to_builder();
builder.insert_pack(self.assertions.clone());
let res = builder.finalize();
debug!(
"Unit {}: file {}: added {}, removed {}.",
unit, path.display(),
res.len() - filtered_len,
set.len() - filtered_len
);
res
}
}
impl From<SlurmFile> for Content {
fn from(slurm: SlurmFile) -> Content {
let mut assertions = payload::PackBuilder::empty();
for payload in slurm.assertions.iter_payload() {
assertions.insert_unchecked(payload)
}
let assertions = assertions.finalize();
Content {
filters: slurm.filters,
assertions
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::payload::testrig;
use rpki::slurm::PrefixFilter;
use rpki::rtr::payload::Payload;
#[test]
fn apply_content() {
use rand::Rng;
fn random_pack<T: Rng>(rng: &mut T, len: usize) -> payload::Pack {
let mut res = payload::PackBuilder::empty();
for _ in 0..len {
res.insert_unchecked(testrig::p(rng.random()))
}
res.finalize()
}
let mut rng = rand_pcg::Pcg32::new(
0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7
);
let s1 = payload::Set::from(random_pack(&mut rng, 100));
let s2 = payload::Set::from(random_pack(&mut rng, 10));
let p3 = random_pack(&mut rng, 15);
let input = s1.merge(&s2);
let output = s1.merge(&payload::Set::from(p3.clone()));
let content = Content {
filters: ValidationOutputFilters {
prefix: s2.iter().filter_map(|payload| {
match payload {
Payload::Origin(origin) => {
Some(PrefixFilter::new(
Some(origin.prefix.prefix()),
Some(origin.asn),
None
))
}
_ => None
}
}).collect(),
bgpsec: Vec::new(),
aspa: None,
},
assertions: p3
};
assert_eq!(content.apply("none", Path::new("/"), input), output);
}
}