use std::sync::Arc;
use std::cmp::Ordering;
use rpki::rtr::{Action, PayloadRef, PayloadType, Serial};
use rpki::rtr::payload::{Aspa, RouteOrigin, RouterKey};
use rpki::rtr::pdu::ProviderAsns;
use rpki::rtr::server::PayloadDiff;
use super::info::PayloadInfo;
use super::snapshot::PayloadSnapshot;
#[derive(Clone, Debug)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
pub struct PayloadDelta {
serial: Serial,
origins: StandardDelta<RouteOrigin>,
router_keys: StandardDelta<RouterKey>,
aspas: AspaDelta,
}
impl PayloadDelta {
pub fn empty(serial: Serial) -> Self {
PayloadDelta {
serial,
origins: Default::default(),
router_keys: Default::default(),
aspas: Default::default(),
}
}
pub fn construct(
old: &PayloadSnapshot, new: &PayloadSnapshot, serial: Serial,
) -> Option<Self> {
let res = Self {
serial: serial.add(1),
origins: StandardDelta::construct(
old.origin_refs().map(|item| item.0),
new.origin_refs().map(|item| item.0),
),
router_keys: StandardDelta::construct(
old.router_keys().map(|item| item.0),
new.router_keys().map(|item| item.0),
),
aspas: AspaDelta::construct(
old.aspas(), new.aspas()
)
};
if res.is_empty() {
None
}
else {
Some(res)
}
}
pub fn merge(&self, new: &Self) -> Self {
Self {
serial: new.serial,
origins: StandardDelta::merge(&self.origins, &new.origins),
router_keys: StandardDelta::merge(
&self.router_keys, &new.router_keys
),
aspas: AspaDelta::merge(&self.aspas, &new.aspas),
}
}
pub fn is_empty(&self) -> bool {
self.origins.is_empty()
&& self.router_keys.is_empty()
&& self.aspas.is_empty()
}
pub fn serial(&self) -> Serial {
self.serial
}
pub fn announce_len(&self) -> usize {
self.origins.announce_len
+ self.router_keys.announce_len
+ self.aspas.announce_len
}
pub fn withdraw_len(&self) -> usize {
self.origins.withdraw_len
+ self.router_keys.withdraw_len
+ self.aspas.withdraw_len
}
pub fn arc_iter(self: Arc<Self>) -> DeltaArcIter {
DeltaArcIter::new(self)
}
pub fn origin_actions(
&self
) -> impl Iterator<Item = (RouteOrigin, Action)> + '_ {
self.origins.items.iter().map(|(item, action)| (*item, *action))
}
pub fn router_key_actions(
&self
) -> impl Iterator<Item = (&RouterKey, Action)> + '_ {
self.router_keys.items.iter().map(|(item, action)| (item, *action))
}
pub fn aspa_actions(
&self
) -> impl Iterator<Item = (&Aspa, Action)> + '_ {
self.aspas.items.iter().map(|(item, action)| (item, action.into()))
}
pub fn actions(
&self
) -> impl Iterator<Item = (PayloadRef<'_>, Action)> + '_ {
self.origin_actions().map(|(p, a)| (p.into(), a)).chain(
self.router_key_actions().map(|(p, a)| (p.into(), a))
).chain(
self.aspa_actions().map(|(p, a)| (p.into(), a))
)
}
}
#[derive(Clone, Debug)]
struct StandardDelta<P> {
items: Vec<(P, Action)>,
announce_len: usize,
withdraw_len: usize,
}
impl<P> Default for StandardDelta<P> {
fn default() -> Self {
Self {
items: Vec::default(),
announce_len: 0,
withdraw_len: 0,
}
}
}
impl<P: Clone + Ord> StandardDelta<P> {
fn construct<'a>(
mut old_iter: impl Iterator<Item = &'a P>,
mut new_iter: impl Iterator<Item = &'a P>,
) -> Self
where P: 'a {
let mut items = Self::default();
let mut opt_old = old_iter.next();
let mut opt_new = new_iter.next();
loop {
let old_item = match opt_old {
Some(item) => item,
None => {
if let Some(new_item) = opt_new {
items.push((new_item.clone(), Action::Announce));
}
items.extend(
new_iter.map(|x| (x.clone(), Action::Announce))
);
break;
}
};
let new_item = match opt_new {
Some(item) => item,
None => {
items.push((old_item.clone(), Action::Withdraw));
items.extend(
old_iter.map(|x| (x.clone(), Action::Withdraw))
);
break;
}
};
match old_item.cmp(new_item) {
Ordering::Less => {
items.push((old_item.clone(), Action::Withdraw));
opt_old = old_iter.next();
}
Ordering::Equal => {
opt_old = old_iter.next();
opt_new = new_iter.next();
}
Ordering::Greater => {
items.push((new_item.clone(), Action::Announce));
opt_new = new_iter.next();
}
}
}
items
}
fn merge(old: &Self, new: &Self) -> Self {
let mut items = Self::default();
let mut old_iter = old.items.iter();
let mut new_iter = new.items.iter();
let mut opt_old = old_iter.next();
let mut opt_new = new_iter.next();
loop {
let old_item = match opt_old {
Some(some) => some,
None => {
if let Some(item) = opt_new {
items.push(item.clone())
}
items.extend(new_iter.cloned());
break;
}
};
let new_item = match opt_new {
Some(some) => some,
None => {
if let Some(item) = opt_old {
items.push(item.clone())
}
items.extend(old_iter.cloned());
break;
}
};
match old_item.0.cmp(&new_item.0) {
Ordering::Less => {
items.push(old_item.clone());
opt_old = old_iter.next();
}
Ordering::Greater => {
items.push(new_item.clone());
opt_new = new_iter.next();
}
Ordering::Equal => {
use rpki::rtr::payload::Action::*;
let action = match (old_item.1, new_item.1) {
(Announce, Announce) => Some(Announce),
(Announce, Withdraw) => None,
(Withdraw, Announce) => None,
(Withdraw, Withdraw) => Some(Withdraw)
};
if let Some(action) = action {
items.push((new_item.0.clone(), action));
}
opt_new = new_iter.next();
opt_old = old_iter.next();
}
}
}
items
}
}
impl<P> StandardDelta<P> {
fn push(&mut self, (payload, action): (P, Action)) {
match action {
Action::Announce => self.announce_len += 1,
Action::Withdraw => self.withdraw_len += 1,
}
self.items.push((payload, action))
}
fn extend(&mut self, iter: impl Iterator<Item = (P, Action)>) {
iter.for_each(|item| self.push(item))
}
fn is_empty(&self) -> bool {
self.items.is_empty()
}
fn get(&self, idx: usize) -> Option<(&P, Action)> {
self.items.get(idx).map(|item| (&item.0, item.1))
}
}
#[cfg(feature = "arbitrary")]
impl<'a, P> arbitrary::Arbitrary<'a> for StandardDelta<P>
where P: arbitrary::Arbitrary<'a> + Ord {
fn arbitrary(
u: &mut arbitrary::Unstructured<'a>
) -> arbitrary::Result<Self> {
let mut items = Vec::<(P, Action)>::arbitrary(u)?;
items.sort_by(|left, right| left.0.cmp(&right.0));
items.dedup_by(|left, right| left.0 == right.0);
let announce_len = items.iter().filter(|(_, action)| {
matches!(action, Action::Announce)
}).count();
let withdraw_len = items.iter().filter(|(_, action)| {
matches!(action, Action::Withdraw)
}).count();
Ok(Self { items, announce_len, withdraw_len })
}
}
#[derive(Clone, Debug, Default)]
struct AspaDelta {
items: Vec<(Aspa, AspaAction)>,
announce_len: usize,
withdraw_len: usize,
}
impl AspaDelta {
fn construct<'a>(
old_iter: impl Iterator<Item = (&'a Aspa, &'a PayloadInfo)>,
new_iter: impl Iterator<Item = (&'a Aspa, &'a PayloadInfo)>,
) -> Self {
use self::AspaAction::*;
let mut items = Self::default();
let mut old_iter = old_iter.map(|(item, _)| item);
let mut new_iter = new_iter.map(|(item, _)| item);
let mut opt_old = old_iter.next();
let mut opt_new = new_iter.next();
loop {
let old_item = match opt_old {
Some(item) => item,
None => {
if let Some(new_item) = opt_new {
items.push((new_item.clone(), Announce));
}
items.extend(new_iter.map(|x| {
(x.clone(), Announce)
}));
break;
}
};
let new_item = match opt_new {
Some(item) => item,
None => {
items.push(AspaAction::withdraw(old_item));
items.extend(old_iter.map(AspaAction::withdraw));
break;
}
};
match old_item.key().cmp(&new_item.key()) {
Ordering::Less => {
items.push(AspaAction::withdraw(old_item));
opt_old = old_iter.next();
}
Ordering::Equal => {
if old_item.providers != new_item.providers {
items.push((
new_item.clone(),
Update(old_item.providers.clone())
))
}
opt_old = old_iter.next();
opt_new = new_iter.next();
}
Ordering::Greater => {
items.push((new_item.clone(), Announce));
opt_new = new_iter.next();
}
}
}
items
}
fn merge(old: &Self, new: &Self) -> Self {
let mut items = Self::default();
let mut old_iter = old.items.iter();
let mut new_iter = new.items.iter();
let mut opt_old = old_iter.next();
let mut opt_new = new_iter.next();
loop {
let old_item = match opt_old {
Some(some) => some,
None => {
if let Some(item) = opt_new {
items.push(item.clone())
}
items.extend(new_iter.cloned());
break;
}
};
let new_item = match opt_new {
Some(some) => some,
None => {
if let Some(item) = opt_old {
items.push(item.clone())
}
items.extend(old_iter.cloned());
break;
}
};
match old_item.0.key().cmp(&new_item.0.key()) {
Ordering::Less => {
items.push(old_item.clone());
opt_old = old_iter.next();
}
Ordering::Greater => {
items.push(new_item.clone());
opt_new = new_iter.next();
}
Ordering::Equal => {
use self::AspaAction::*;
let action = match (&old_item.1, &new_item.1) {
(Announce, Announce) => Some(Announce),
(Announce, Update(_)) => Some(Announce),
(Announce, Withdraw(_)) => None,
(Update(ref p), Announce) => Some(Update(p.clone())),
(Update(ref p), Update(_)) => {
if *p == new_item.0.providers {
None
}
else {
Some(Update(p.clone()))
}
}
(Update(ref p), Withdraw(_)) => {
Some(Withdraw(p.clone()))
}
(Withdraw(ref p), Announce) => {
if *p == new_item.0.providers {
None
}
else {
Some(Update(p.clone()))
}
}
(Withdraw(ref p), Update(_)) => {
if *p == new_item.0.providers {
None
}
else {
Some(Update(p.clone()))
}
}
(Withdraw(ref p), Withdraw(_)) => {
Some(Withdraw(p.clone()))
}
};
if let Some(action) = action {
items.push((new_item.0.clone(), action));
}
opt_old = old_iter.next();
opt_new = new_iter.next();
}
}
}
items
}
fn push(&mut self, (payload, action): (Aspa, AspaAction)) {
match action {
AspaAction::Announce | AspaAction::Update(_) => {
self.announce_len += 1
}
AspaAction::Withdraw(_) => self.withdraw_len += 1,
}
self.items.push((payload, action))
}
fn extend(&mut self, iter: impl Iterator<Item = (Aspa, AspaAction)>) {
iter.for_each(|item| self.push(item))
}
fn is_empty(&self) -> bool {
self.items.is_empty()
}
fn get(&self, idx: usize) -> Option<(&Aspa, Action)> {
self.items.get(idx).map(|item| (&item.0, (&item.1).into()))
}
}
#[cfg(feature = "arbitrary")]
impl<'a> arbitrary::Arbitrary<'a> for AspaDelta {
fn arbitrary(
u: &mut arbitrary::Unstructured<'a>
) -> arbitrary::Result<Self> {
let mut items = Vec::<(Aspa, AspaAction)>::arbitrary(u)?;
items.sort_by(|left, right| left.0.cmp(&right.0));
items.dedup_by(|left, right| left.0 == right.0);
let announce_len = items.iter().filter(|(_, action)| {
matches!(action, AspaAction::Announce | AspaAction::Update(_))
}).count();
let withdraw_len = items.iter().filter(|(_, action)| {
matches!(action, AspaAction::Withdraw(_))
}).count();
Ok(Self { items, announce_len, withdraw_len })
}
}
#[derive(Clone, Debug)]
pub struct DeltaArcIter {
delta: Arc<PayloadDelta>,
current_type: PayloadType,
next: usize
}
impl DeltaArcIter {
fn new(delta: Arc<PayloadDelta>) -> Self {
Self {
delta,
current_type: PayloadType::Origin,
next: 0,
}
}
}
impl PayloadDiff for DeltaArcIter {
fn next(&mut self) -> Option<(PayloadRef<'_>, Action)> {
if matches!(self.current_type, PayloadType::Origin) {
if let Some(res) = self.delta.origins.get(self.next) {
self.next += 1;
return Some((res.0.into(), res.1));
}
self.current_type = PayloadType::RouterKey;
self.next = 0;
}
if matches!(self.current_type, PayloadType::RouterKey) {
if let Some(res) = self.delta.router_keys.get(self.next) {
self.next += 1;
return Some((res.0.into(), res.1));
}
self.current_type = PayloadType::Aspa;
self.next = 0;
}
assert!(matches!(self.current_type, PayloadType::Aspa));
let res = self.delta.aspas.get(self.next)?;
self.next += 1;
Some((res.0.into(), res.1))
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
enum AspaAction {
Announce,
Update(ProviderAsns),
Withdraw(ProviderAsns),
}
impl AspaAction {
fn withdraw(aspa: &Aspa) -> (Aspa, Self) {
(aspa.withdraw(), Self::Withdraw(aspa.providers.clone()))
}
}
impl From<AspaAction> for Action {
fn from(src: AspaAction) -> Self {
match src {
AspaAction::Announce => Action::Announce,
AspaAction::Update(_) => Action::Announce,
AspaAction::Withdraw(_) => Action::Withdraw,
}
}
}
impl<'a> From<&'a AspaAction> for Action {
fn from(src: &'a AspaAction) -> Self {
match *src {
AspaAction::Announce => Action::Announce,
AspaAction::Update(_) => Action::Announce,
AspaAction::Withdraw(_) => Action::Withdraw,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::collections::HashSet;
#[test]
fn standard_construct() {
fn process(old: &mut [u32], new: &mut [u32]) {
let old_set: HashSet<_> = old.iter().copied().collect();
let new_set: HashSet<_> = new.iter().copied().collect();
old.sort();
new.sort();
let delta = StandardDelta::construct(old.iter(), new.iter());
let mut announce_set: Vec<_> = new_set.difference(
&old_set
).copied().collect();
announce_set.sort();
let announce = delta.items.iter().filter_map(|(item, action)| {
match action {
Action::Announce => Some(*item),
Action::Withdraw => None
}
}).collect::<Vec<_>>();
let mut withdraw_set: Vec<_> = old_set.difference(
&new_set
).copied().collect();
withdraw_set.sort();
let withdraw = delta.items.iter().filter_map(|(item, action)| {
match action {
Action::Withdraw => Some(*item),
Action::Announce => None,
}
}).collect::<Vec<_>>();
assert_eq!(announce.len(), delta.announce_len);
assert_eq!(withdraw.len(), delta.withdraw_len);
assert_eq!(announce, announce_set);
assert_eq!(withdraw, withdraw_set);
}
process(&mut [], &mut []);
process(&mut [0, 1, 2, 3], &mut [0, 1, 2, 3]);
process(&mut [], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut []);
process(&mut [0, 2, 3], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut [0, 2, 3]);
process(&mut [ 1, 2, 3], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut [ 1, 2, 3]);
process(&mut [0, 1, 2 ], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut [0, 1, 2 ]);
process(&mut [0, 1, 2, 3], &mut [0, 1, 2, 3]);
process(&mut [0, 3], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut [0, 3]);
process(&mut [ 2, 3], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut [ 2, 3]);
process(&mut [0, 1, ], &mut [0, 1, 2, 3]);
process(&mut [0, 1, 2, 3], &mut [0, 1, ]);
}
}