use std::{fmt::Display, io, net::{IpAddr, Ipv4Addr, Ipv6Addr}};
use std::io::Write;
use axum::{body::Body, extract::{Path, Query, State}, response::IntoResponse};
use bytes::Bytes;
use inetnum::{addr::Prefix, asn::Asn};
use log::{debug, warn};
use routecore::{bgp::{communities::{LargeCommunity, StandardCommunity}, path_attributes::PathAttributeType, types::AfiSafiType}, bmp::message::RibType};
use serde::Deserialize;
use serde_with::serde_as;
use serde_with::formats::CommaSeparator;
use serde_with::StringWithSeparator;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::{http_ng::{Api, ApiError, ApiState}, ingress::IngressId, representation::{GenOutput, Json}, roto_runtime::types::PeerRibType, units::rib_unit::rpki::RovStatus};
pub fn register_routes(router: &mut Api) {
router.add_get("/ribs/ipv4unicast/routes/{prefix}/{prefix_len}", search_ipv4unicast);
router.add_get("/ribs/ipv4unicast/routes", search_ipv4unicast_all);
router.add_get("/ribs/ipv6unicast/routes/{prefix}/{prefix_len}", search_ipv6unicast);
router.add_get("/ribs/ipv6unicast/routes", search_ipv6unicast_all);
router.add_get("/ribs/{afisafi}/routes", generic_afisafi_all);
}
#[derive(Debug, Deserialize)]
enum SupportedAfiSafi {
#[serde(rename = "ipv4unicast")]
Ipv4Unicast,
#[serde(rename = "ipv6unicast")]
Ipv6Unicast,
}
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize)]
#[serde(rename_all(deserialize = "camelCase"))]
pub struct QueryFilter {
#[serde(default)]
#[serde_as(as = "StringWithSeparator::<CommaSeparator, Include>")]
pub include: Vec<Include>,
pub ingress_id: Option<IngressId>,
#[serde(rename = "filter[originAsn]")]
pub origin_asn: Option<Asn>,
#[serde(rename = "filter[otc]")]
pub otc: Option<Asn>,
#[serde(rename = "filter[community]")]
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub community: Option<StandardCommunity>,
#[serde(rename = "filter[largeCommunity]")]
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub large_community: Option<LargeCommunity>,
#[serde(rename = "filter[ribType]")]
pub rib_type: Option<PeerRibType>,
#[serde(rename = "filter[rovStatus]")]
pub rov_status: Option<RovStatus>,
#[serde(rename = "filter[peerAsn]")]
pub peer_asn: Option<Asn>,
#[serde(rename = "filter[peerAddress]")]
pub peer_addr: Option<IpAddr>,
#[serde_as(as = "Option<StringWithSeparator::<CommaSeparator, u8>>")]
#[serde(rename = "fields[pathAttributes]")]
pub fields_path_attributes: Option<Vec<u8>>,
#[serde(rename = "function[roto]")]
pub roto_function: Option<String>
}
impl QueryFilter {
pub fn enable_more_specifics(&mut self) {
if !self.include.contains(&Include::MoreSpecifics) {
self.include.push(Include::MoreSpecifics);
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Include {
MoreSpecifics,
LessSpecifics,
}
const STREAM_CHUNK_SIZE: usize = 256 * 1024;
struct StreamResponseWriter {
sender: mpsc::Sender<Result<Bytes, io::Error>>,
buffer: Vec<u8>,
}
impl StreamResponseWriter {
fn new(sender: mpsc::Sender<Result<Bytes, io::Error>>) -> Self {
Self {
sender,
buffer: Vec::with_capacity(STREAM_CHUNK_SIZE),
}
}
fn send_buffer(&mut self) -> io::Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let chunk = Bytes::copy_from_slice(&self.buffer);
self.buffer.clear();
self.sender
.blocking_send(Ok(chunk))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "receiver dropped"))
}
}
impl io::Write for StreamResponseWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend_from_slice(buf);
if self.buffer.len() >= STREAM_CHUNK_SIZE {
self.send_buffer()?;
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.send_buffer()
}
}
fn stream_search_result(
search_result: super::rib::SearchResult,
) -> impl IntoResponse {
let (tx, rx) = mpsc::channel::<Result<Bytes, io::Error>>(64);
let stream = ReceiverStream::new(rx);
tokio::task::spawn_blocking(move || {
let mut writer = StreamResponseWriter::new(tx);
let _ = search_result.write(&mut Json(&mut writer));
let _ = writer.flush();
});
([("content-type", "application/json")], Body::from_stream(stream))
}
#[derive(Debug)]
pub struct UnknownInclude;
impl Display for UnknownInclude {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unknown include")
}
}
impl std::str::FromStr for Include {
type Err = UnknownInclude;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"moreSpecifics" => Ok(Include::MoreSpecifics),
"lessSpecifics" => Ok(Include::LessSpecifics),
_ => Err(UnknownInclude)
}
}
}
async fn generic_afisafi_all(
Path(afisafi): Path<SupportedAfiSafi>,
filter: Query<QueryFilter>,
_state: State<ApiState>
) -> Result<Vec<u8>, ApiError> {
dbg!(afisafi, filter);
warn!("searching routes other than unicast not yet implemented");
Err(ApiError::InternalServerError("TODO".into()))
}
async fn search_ipv4unicast(
Path((prefix, prefix_len)): Path<(Ipv4Addr, u8)>,
Query(filter): Query<QueryFilter>,
state: State<ApiState>
) -> Result<impl IntoResponse, ApiError> {
let prefix = Prefix::new_v4(prefix, prefix_len).map_err(|e| ApiError::BadRequest(e.to_string()))?;
let s = state.store.load();
let search_result = match *s {
Some(ref store) => store.search_routes(AfiSafiType::Ipv4Unicast, prefix, filter)
.map_err(ApiError::BadRequest)?,
None => return Err(ApiError::InternalServerError("store unavailable".into())),
};
Ok(stream_search_result(search_result))
}
async fn search_ipv4unicast_all(
mut filter: Query<QueryFilter>,
state: State<ApiState>
) -> Result<impl IntoResponse, ApiError> {
filter.enable_more_specifics();
search_ipv4unicast(Path((0.into(), 0)), filter, state).await
}
async fn search_ipv6unicast(
Path((prefix, prefix_len)): Path<(Ipv6Addr, u8)>,
Query(filter): Query<QueryFilter>,
state: State<ApiState>
) -> Result<impl IntoResponse, ApiError> {
let prefix = Prefix::new_v6(prefix, prefix_len).map_err(|e| ApiError::BadRequest(e.to_string()))?;
let s = state.store.load();
let search_result = match *s {
Some(ref store) => store.search_routes(AfiSafiType::Ipv6Unicast, prefix, filter)
.map_err(ApiError::BadRequest)?,
None => return Err(ApiError::InternalServerError("store unavailable".into())),
};
Ok(stream_search_result(search_result))
}
async fn search_ipv6unicast_all(
mut filter: Query<QueryFilter>,
state: State<ApiState>
) -> Result<impl IntoResponse, ApiError> {
filter.enable_more_specifics();
search_ipv6unicast(Path((0.into(), 0)), filter, state).await
}