use std::str::FromStr;
use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream;
use rpki::rtr::Serial;
use rpki::rtr::payload::{Action, PayloadRef};
use rpki::rtr::server::{NotifySender, PayloadDiff};
use crate::payload::{
DeltaArcIter, PayloadDelta, PayloadSnapshot, SharedHistory, SnapshotArcIter
};
use crate::utils::fmt::WriteOrPanic;
use crate::utils::date::format_iso_date;
use crate::utils::json::JsonBuilder;
use super::request::Request;
use super::response::{ContentType, Response, ResponseBuilder};
pub fn handle_get_or_head(
req: Request,
history: &SharedHistory,
) -> Result<Response, Request> {
if req.uri().path() != "/json-delta" {
return Err(req)
}
let history = history.read();
if !history.is_active() {
return Ok(Response::initial_validation(true))
}
let version = match version_from_query(req.uri().query()) {
Ok(version) => version,
Err(response) => return Ok(response)
};
if req.is_head() {
return Ok(
ResponseBuilder::ok().content_type(ContentType::JSON).empty()
)
}
let created = history.created().unwrap_or(Utc::now());
if let Some((session, serial)) = version {
if session == history.session() {
if let Some(delta) = history.delta_since(serial) {
return Ok(handle_delta(
session, serial, history.serial(), delta, created
))
}
}
}
let snapshot = match history.current() {
Some(snapshot) => snapshot,
None => return Ok(Response::initial_validation(true)),
};
Ok(handle_reset(history.session(), history.serial(), snapshot, created))
}
fn handle_delta(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>, created: DateTime<Utc>,
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON).stream(
stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta, created)
)
)
}
fn handle_reset(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>,
created: DateTime<Utc>,
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON).stream(
stream::iter(
SnapshotStream::new(session, to_serial, snapshot, created)
)
)
}
pub async fn handle_notify_get_or_head(
req: Request,
history: &SharedHistory,
notify: &NotifySender,
) -> Result<Response, Request> {
if req.uri().path() != "/json-delta/notify" {
return Err(req)
}
let wait = match need_wait(&req, history) {
Ok(wait) => wait,
Err(resp) => return Ok(resp),
};
if wait {
notify.subscribe().recv().await;
}
if req.is_head() {
Ok(
ResponseBuilder::ok().content_type(ContentType::JSON).empty()
)
}
else {
let (session, serial) = history.read().session_and_serial();
Ok(
ResponseBuilder::ok().content_type(ContentType::JSON).body(
JsonBuilder::build(|json| {
json.member_raw("session", session);
json.member_raw("serial", serial);
})
)
)
}
}
#[allow(clippy::result_large_err)]
fn need_wait(
req: &Request,
history: &SharedHistory,
) -> Result<bool, Response> {
let version = match version_from_query(req.uri().query())? {
Some(version) => version,
None => return Ok(false),
};
Ok(history.read().session_and_serial() == version)
}
#[allow(clippy::result_large_err)]
fn version_from_query(
query: Option<&str>
) -> Result<Option<(u64, Serial)>, Response> {
let query = match query {
Some(query) => query,
None => return Ok(None)
};
let mut session = None;
let mut serial = None;
for (key, value) in form_urlencoded::parse(query.as_ref()) {
if key == "session" {
if session.is_some() {
return Err(Response::bad_request(
true, "duplicate 'session' argument in query"
));
}
session = Some(u64::from_str(&value).map_err(|_| {
Response::bad_request(
true, "invalid 'session' argument in query"
)
})?);
}
else if key == "serial" {
if serial.is_some() {
return Err(Response::bad_request(
true, "duplicate 'serial' argument in query"
));
}
serial = Some(Serial::from_str(&value).map_err(|_| {
Response::bad_request(
true, "invalid 'serial' argument in query"
)
})?);
}
else {
return Err(Response::bad_request(
true, format_args!("unexpected argument '{key}' in query")
));
}
}
match (session, serial) {
(Some(session), Some(serial)) => Ok(Some((session, serial))),
(None, None) => Ok(None),
(Some(_), None) => {
Err(Response::bad_request(
true, "missing 'serial' argument in query"
))
}
(None, Some(_)) => {
Err(Response::bad_request(
true, "missing 'session' argument in query"
))
}
}
}
struct DeltaStream {
header: Option<Vec<u8>>,
announce: Option<DeltaArcIter>,
withdraw: Option<DeltaArcIter>,
first: bool,
}
impl DeltaStream {
fn new(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>, created: DateTime<Utc>,
) -> Self {
let mut vec = Vec::new();
Self::append_header(
&mut vec, session, from_serial, to_serial, created
);
DeltaStream {
header: Some(vec),
announce: Some(delta.clone().arc_iter()),
withdraw: Some(delta.arc_iter()),
first: true,
}
}
fn append_header(
vec: &mut Vec<u8>,
session: u64, from_serial: Serial, to_serial: Serial,
created: DateTime<Utc>,
) {
write!(vec, "\
{{\
\n \"reset\": false,\
\n \"session\": \"{}\",\
\n \"serial\": {},\
\n \"fromSerial\": {},\
\n \"generated\": {},\
\n \"generatedTime\": \"{}\",\
\n \"announced\": [",
session, to_serial, from_serial,
created.timestamp(), format_iso_date(created),
)
}
fn append_separator(
vec: &mut Vec<u8>,
) {
write!(vec, "\
\n ],\
\n \"withdrawn\": [",
)
}
fn append_payload(
vec: &mut Vec<u8>,
payload: PayloadRef,
first: bool
) {
if !first {
vec.push(b',')
}
match payload {
PayloadRef::Origin(origin) => {
write!(vec, "\
\n {{\
\n \"type\": \"routeOrigin\",\
\n \"asn\": \"{}\",\
\n \"prefix\": \"{}/{}\",\
\n \"maxLength\": {}\
\n }}",
origin.asn,
origin.prefix.addr(), origin.prefix.prefix_len(),
origin.prefix.resolved_max_len()
)
},
PayloadRef::RouterKey(key) => {
write!(vec, "\
\n {{\
\n \"type\": \"routerKey\",\
\n \"keyIdentifier\": \"{}\",\
\n \"asn\": \"{}\",\
\n \"keyInfo\": \"{}\"
\n }}",
key.key_identifier,
key.asn,
key.key_info,
)
}
PayloadRef::Aspa(aspa) => {
write!(vec, "\
\n {{\
\n \"type\": \"aspa\",
\n \"customerAsn\": \"{}\",\
\n \"providerAsns\": [",
aspa.customer,
);
let mut first = true;
for asn in aspa.providers.iter() {
if first {
write!(vec, "\"{asn}\"");
first = false
}
else {
write!(vec, ", \"{asn}\"");
}
}
write!(vec, "]\n\n }}");
}
}
}
fn append_footer(vec: &mut Vec<u8>) {
vec.extend_from_slice(b"\n ]\n}\n");
}
}
impl Iterator for DeltaStream {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
#[allow(clippy::question_mark)]
if self.withdraw.is_none() {
return None
}
let mut vec = self.header.take().unwrap_or_default();
loop {
if vec.len() > 64000 {
return Some(vec.into())
}
if self.next_announce(&mut vec) {
continue;
}
if !self.next_withdraw(&mut vec) {
return Some(vec.into())
}
}
}
}
impl DeltaStream {
fn next_announce(&mut self, vec: &mut Vec<u8>) -> bool {
if let Some(announce) = self.announce.as_mut() {
while let Some((payload, action)) = announce.next() {
if matches!(action, Action::Announce) {
Self::append_payload(vec, payload, self.first);
self.first = false;
return true
}
}
}
else {
return false;
}
Self::append_separator(vec);
self.announce = None;
self.first = true;
true
}
fn next_withdraw(&mut self, vec: &mut Vec<u8>) -> bool {
if let Some(withdraw) = self.withdraw.as_mut() {
while let Some((payload, action)) = withdraw.next() {
if matches!(action, Action::Withdraw) {
Self::append_payload(vec, payload, self.first);
self.first = false;
return true
}
}
}
else {
return false;
}
Self::append_footer(vec);
self.withdraw = None;
false
}
}
struct SnapshotStream {
header: Option<Vec<u8>>,
iter: Option<SnapshotArcIter>,
}
impl SnapshotStream {
fn new(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>,
created: DateTime<Utc>,
) -> Self {
let mut vec = Vec::new();
Self::append_header(&mut vec, session, to_serial, created);
SnapshotStream {
header: Some(vec),
iter: Some(snapshot.arc_iter()),
}
}
fn append_header(
vec: &mut Vec<u8>,
session: u64, to_serial: Serial, created: DateTime<Utc>,
) {
write!(vec, "\
{{\
\n \"reset\": true,\
\n \"session\": \"{}\",\
\n \"serial\": {},\
\n \"generated\": {},\
\n \"generatedTime\": \"{}\",\
\n \"announced\": [",
session, to_serial,
created.timestamp(), format_iso_date(created),
)
}
}
impl Iterator for SnapshotStream {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
use rpki::rtr::server::PayloadSet;
let iter = self.iter.as_mut()?;
let mut first = self.header.is_some();
let mut vec = self.header.take().unwrap_or_default();
loop {
if vec.len() > 64000 {
return Some(vec.into())
}
match iter.next() {
Some(payload) => {
DeltaStream::append_payload(
&mut vec, payload, first,
);
}
None => {
break
}
}
first = false;
}
self.iter = None;
DeltaStream::append_footer(&mut vec);
Some(vec.into())
}
}