#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use std::time::Duration;
use drasi_lib::channels::ResultDiff;
use drasi_lib::{DrasiLib, Query};
use drasi_reaction_application::subscription::{Subscription, SubscriptionOptions};
use drasi_reaction_application::ApplicationReactionBuilder;
use drasi_source_ris_live::RisLiveSource;
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::sync::mpsc;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
enum HarnessAction {
Announce { next_hop: String, msg_id: String },
Withdraw { msg_id: String },
PeerState { state: String, msg_id: String },
}
async fn find_available_port() -> (tokio::net::TcpListener, u16) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
(listener, port)
}
fn routes_contains_value(value: &serde_json::Value, next_hop: &str) -> bool {
value["prefix"] == "203.0.113.0/24" && value["next_hop"] == next_hop
}
async fn wait_for_result<F>(
subscription: &mut Subscription,
timeout_duration: Duration,
mut predicate: F,
) -> bool
where
F: FnMut(&drasi_lib::channels::QueryResult) -> bool,
{
let deadline = tokio::time::Instant::now() + timeout_duration;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return false;
}
match tokio::time::timeout(remaining, subscription.recv()).await {
Ok(Some(result)) => {
if predicate(&result) {
return true;
}
}
Ok(None) => return false,
Err(_) => return false,
}
}
}
async fn start_mock_ris_server(listener: tokio::net::TcpListener) -> mpsc::Sender<HarnessAction> {
let (tx, mut rx) = mpsc::channel::<HarnessAction>(32);
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws_stream = accept_async(stream).await.expect("websocket accept");
let subscribe_frame = ws_stream
.next()
.await
.expect("subscribe frame")
.expect("frame");
match subscribe_frame {
Message::Text(text) => {
let parsed: serde_json::Value =
serde_json::from_str(&text).expect("json subscribe");
assert_eq!(parsed["type"], "ris_subscribe");
}
other => panic!("expected text subscribe frame, got {other:?}"),
}
ws_stream
.send(Message::Text(
json!({
"type": "ris_subscribe_ok",
"data": {
"subscription": { "host": "rrc00", "type": "UPDATE" },
"socketOptions": { "acknowledge": true, "includeRaw": false }
}
})
.to_string(),
))
.await
.expect("send subscribe ok");
while let Some(action) = rx.recv().await {
let payload = match action {
HarnessAction::Announce { next_hop, msg_id } => json!({
"type": "ris_message",
"data": {
"timestamp": 1773098494.83,
"peer": "208.80.153.193",
"peer_asn": "14907",
"id": msg_id,
"host": "rrc00.ripe.net",
"type": "UPDATE",
"path": [14907, 3356, 64496],
"origin": "IGP",
"community": [[3356, 5]],
"announcements": [
{ "next_hop": next_hop, "prefixes": ["203.0.113.0/24"] }
],
"withdrawals": []
}
}),
HarnessAction::Withdraw { msg_id } => json!({
"type": "ris_message",
"data": {
"timestamp": 1773098495.83,
"peer": "208.80.153.193",
"peer_asn": "14907",
"id": msg_id,
"host": "rrc00.ripe.net",
"type": "UPDATE",
"withdrawals": ["203.0.113.0/24"]
}
}),
HarnessAction::PeerState { state, msg_id } => json!({
"type": "ris_message",
"data": {
"timestamp": 1773098496.83,
"peer": "208.80.153.193",
"peer_asn": "14907",
"id": msg_id,
"host": "rrc00.ripe.net",
"type": "RIS_PEER_STATE",
"state": state
}
}),
};
ws_stream
.send(Message::Text(payload.to_string()))
.await
.expect("send payload");
}
let _ = ws_stream.close(None).await;
});
tx
}
#[tokio::test]
#[ignore]
async fn test_change_detection_with_websocket_harness() {
let (listener, port) = find_available_port().await;
let harness_tx = start_mock_ris_server(listener).await;
let source = RisLiveSource::builder("ris-source")
.with_websocket_url(format!("ws://127.0.0.1:{port}/v1/ws/"))
.with_host("rrc00")
.with_start_from_now()
.with_auto_start(true)
.build()
.unwrap();
let routes_query = Query::cypher("routes-query")
.query(
"MATCH (peer:Peer)-[r:ROUTES]->(p:Prefix) \
RETURN peer.peer_ip AS peer, p.prefix AS prefix, r.next_hop AS next_hop",
)
.from_source("ris-source")
.auto_start(true)
.build();
let peer_query = Query::cypher("peer-query")
.query("MATCH (peer:Peer) RETURN peer.peer_ip AS peer, peer.state AS state")
.from_source("ris-source")
.auto_start(true)
.build();
let (reaction, handle) = ApplicationReactionBuilder::new("app-reaction")
.with_queries(vec!["routes-query".to_string(), "peer-query".to_string()])
.build();
let drasi = Arc::new(
DrasiLib::builder()
.with_id("ris-live-integration-test")
.with_source(source)
.with_query(routes_query)
.with_query(peer_query)
.with_reaction(reaction)
.build()
.await
.unwrap(),
);
drasi.start().await.unwrap();
let mut subscription = handle
.subscribe_with_options(SubscriptionOptions::default().with_timeout(Duration::from_secs(1)))
.await
.unwrap();
harness_tx
.send(HarnessAction::Announce {
next_hop: "198.51.100.1".to_string(),
msg_id: "msg-insert".to_string(),
})
.await
.unwrap();
let found_insert = wait_for_result(&mut subscription, Duration::from_secs(10), |result| {
if result.query_id != "routes-query" {
return false;
}
result.results.iter().any(|diff| match diff {
ResultDiff::Add { data, .. } => routes_contains_value(data, "198.51.100.1"),
_ => false,
})
})
.await;
assert!(found_insert, "INSERT announcement was not detected");
harness_tx
.send(HarnessAction::Announce {
next_hop: "198.51.100.2".to_string(),
msg_id: "msg-update".to_string(),
})
.await
.unwrap();
let found_update = wait_for_result(&mut subscription, Duration::from_secs(10), |result| {
if result.query_id != "routes-query" {
return false;
}
result.results.iter().any(|diff| match diff {
ResultDiff::Update { after, .. } => routes_contains_value(after, "198.51.100.2"),
_ => false,
})
})
.await;
assert!(found_update, "UPDATE announcement was not detected");
harness_tx
.send(HarnessAction::Withdraw {
msg_id: "msg-delete".to_string(),
})
.await
.unwrap();
let found_delete = wait_for_result(&mut subscription, Duration::from_secs(10), |result| {
if result.query_id != "routes-query" {
return false;
}
result.results.iter().any(|diff| match diff {
ResultDiff::Delete { data, .. } => data["prefix"] == "203.0.113.0/24",
_ => false,
})
})
.await;
assert!(found_delete, "DELETE withdrawal was not detected");
harness_tx
.send(HarnessAction::PeerState {
state: "down".to_string(),
msg_id: "msg-peer-state".to_string(),
})
.await
.unwrap();
let found_peer_state = wait_for_result(&mut subscription, Duration::from_secs(10), |result| {
if result.query_id != "peer-query" {
return false;
}
result.results.iter().any(|diff| match diff {
ResultDiff::Update { after, .. } => after["state"] == "down",
ResultDiff::Add { data, .. } => data["state"] == "down",
_ => false,
})
})
.await;
assert!(found_peer_state, "RIS_PEER_STATE update was not detected");
drasi.stop().await.unwrap();
}