use std::{cmp, ops};
use std::collections::VecDeque;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use chrono::{DateTime, Utc};
use log::info;
use rpki::rtr::{Serial, State, Timing};
use rpki::rtr::server::PayloadSource;
use crate::config::{Config, FilterPolicy};
use crate::metrics::Metrics;
use crate::slurm::LocalExceptions;
use super::delta::{DeltaArcIter, PayloadDelta};
use super::snapshot::{PayloadSnapshot, SnapshotArcIter};
use super::validation::ValidationReport;
#[derive(Clone, Debug)]
pub struct SharedHistory(Arc<RwLock<PayloadHistory>>);
impl SharedHistory {
pub fn from_config(config: &Config) -> Self {
SharedHistory(Arc::new(RwLock::new(
PayloadHistory::from_config(config)
)))
}
pub fn read(&self) -> impl ops::Deref<Target = PayloadHistory> + '_ {
self.0.read().expect("Payload history lock poisoned")
}
fn write(&self) -> impl ops::DerefMut<Target = PayloadHistory> + '_ {
self.0.write().expect("Payload history lock poisoned")
}
pub fn update(
&self,
report: ValidationReport,
exceptions: &LocalExceptions,
mut metrics: Metrics
) -> bool {
let snapshot = report.into_snapshot(
exceptions, &mut metrics,
);
let (current, serial) = {
let read = self.read();
(read.current(), read.serial())
};
let delta = current.as_ref().and_then(|current| {
PayloadDelta::construct(current, &snapshot, serial)
});
let mut history = self.write();
history.metrics = Some(metrics.into());
let res = if let Some(delta) = delta {
info!(
"Delta with {} announced and {} withdrawn items.",
delta.announce_len(),
delta.withdraw_len(),
);
history.push_delta(delta);
true
}
else if current.is_none() {
true
}
else {
false
};
history.current = Some(snapshot.into());
res
}
pub fn mark_update_start(&self) {
self.write().last_update_start = Utc::now();
}
pub fn mark_update_done(&self) {
let mut locked = self.write();
let now = Utc::now();
locked.last_update_done = Some(now);
locked.last_update_duration = Some(
now.signed_duration_since(locked.last_update_start)
.to_std().unwrap_or_else(|_| Duration::from_secs(0))
);
locked.next_update_start = SystemTime::now() + locked.refresh;
if let Some(refresh) = locked.current.as_ref().and_then(|c|
c.refresh()
) {
let refresh = SystemTime::from(refresh);
if refresh < locked.next_update_start {
locked.next_update_start = refresh;
}
}
locked.created = {
if let Some(created) = locked.created {
if now.timestamp() <= created.timestamp() {
Some(created + chrono::Duration::try_seconds(1).unwrap())
}
else {
Some(now)
}
}
else {
Some(now)
}
};
}
}
impl PayloadSource for SharedHistory {
type Set = SnapshotArcIter;
type Diff = DeltaArcIter;
fn ready(&self) -> bool {
self.read().is_active()
}
fn notify(&self) -> State {
let read = self.read();
State::from_parts(read.rtr_session(), read.serial())
}
fn full(&self) -> (State, Self::Set) {
let read = self.read();
(
State::from_parts(read.rtr_session(), read.serial()),
read.current.clone().unwrap_or_default().arc_iter(),
)
}
fn diff(&self, state: State) -> Option<(State, Self::Diff)> {
let read = self.read();
if read.rtr_session() != state.session() {
return None
}
read.delta_since(state.serial()).map(|delta| {
(
State::from_parts(read.rtr_session(), read.serial()),
delta.arc_iter(),
)
})
}
fn timing(&self) -> Timing {
let read = self.read();
let mut res = read.timing;
res.refresh = u32::try_from(
read.update_wait().as_secs()
).unwrap_or(u32::MAX);
res
}
}
#[derive(Clone, Debug)]
pub struct PayloadHistory {
current: Option<Arc<PayloadSnapshot>>,
deltas: VecDeque<Arc<PayloadDelta>>,
metrics: Option<Arc<Metrics>>,
session: u64,
keep: usize,
refresh: Duration,
min_refresh: Option<Duration>,
unsafe_vrps: FilterPolicy,
last_update_start: DateTime<Utc>,
last_update_done: Option<DateTime<Utc>>,
last_update_duration: Option<Duration>,
next_update_start: SystemTime,
created: Option<DateTime<Utc>>,
timing: Timing,
}
impl PayloadHistory {
pub fn from_config(config: &Config) -> Self {
PayloadHistory {
current: None,
deltas: VecDeque::with_capacity(config.history_size),
metrics: None,
session: {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH).unwrap()
.as_secs()
},
keep: config.history_size,
refresh: config.refresh,
min_refresh: config.min_refresh,
unsafe_vrps: config.unsafe_vrps,
last_update_start: Utc::now(),
last_update_done: None,
last_update_duration: None,
next_update_start: SystemTime::now() + config.refresh,
created: None,
timing: Timing {
refresh: config.refresh.as_secs() as u32,
retry: config.retry.as_secs() as u32,
expire: config.expire.as_secs() as u32,
},
}
}
fn push_delta(&mut self, delta: PayloadDelta) {
if self.deltas.len() == self.keep {
let _ = self.deltas.pop_back();
}
self.deltas.push_front(Arc::new(delta))
}
pub fn is_active(&self) -> bool {
self.current.is_some()
}
pub fn current(&self) -> Option<Arc<PayloadSnapshot>> {
self.current.clone()
}
pub fn refresh_wait(&self) -> Duration {
let wait_time = match self.min_refresh {
None => self.refresh,
Some(refresh) => refresh
};
cmp::max(
self.next_update_start
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::from_secs(0)),
wait_time
)
}
pub fn update_wait(&self) -> Duration {
let start = match self.last_update_duration {
Some(duration) => self.next_update_start + duration + duration,
None => self.next_update_start + self.refresh
};
start.duration_since(SystemTime::now()).unwrap_or(self.refresh)
}
pub fn delta_since(&self, serial: Serial) -> Option<Arc<PayloadDelta>> {
if let Some(delta) = self.deltas.front() {
if delta.serial() < serial {
return None
}
else if delta.serial() == serial {
return Some(Arc::new(PayloadDelta::empty(serial)))
}
else if delta.serial() == serial.add(1) {
return Some(delta.clone())
}
}
else {
if serial == 0 {
return Some(Arc::new(PayloadDelta::empty(serial)))
}
else {
return None
}
};
let mut iter = self.deltas.iter().rev();
for delta in &mut iter {
match delta.serial().partial_cmp(&serial) {
Some(cmp::Ordering::Greater) => return None,
Some(cmp::Ordering::Equal) => break,
_ => continue
}
}
let mut res = match iter.next() {
Some(delta) => delta.clone(),
None => return Some(Arc::new(PayloadDelta::empty(serial))),
};
for delta in iter {
res = Arc::new(res.merge(delta));
}
Some(res)
}
pub fn serial(&self) -> Serial {
self.deltas.front().map(|delta| {
delta.serial()
}).unwrap_or_else(|| 0.into())
}
pub fn session(&self) -> u64 {
self.session
}
pub fn session_and_serial(&self) -> (u64, Serial) {
(self.session(), self.serial())
}
pub fn rtr_session(&self) -> u16 {
self.session as u16
}
pub fn metrics(&self) -> Option<Arc<Metrics>> {
self.metrics.clone()
}
pub fn last_update_start(&self) -> DateTime<Utc> {
self.last_update_start
}
pub fn last_update_done(&self) -> Option<DateTime<Utc>> {
self.last_update_done
}
pub fn last_update_duration(&self) -> Option<Duration> {
self.last_update_duration
}
pub fn created(&self) -> Option<DateTime<Utc>> {
self.created
}
pub fn unsafe_vrps(&self) -> FilterPolicy {
self.unsafe_vrps
}
}