use anyhow::{anyhow, bail, Context, Result};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::env;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, tungstenite};
#[derive(Debug, Clone, Serialize)]
struct RequestEnvelope {
id: String,
method: String,
#[serde(default)]
params: Value,
}
#[derive(Debug, Clone, Deserialize)]
struct ResponseEnvelope {
id: String,
#[serde(default)]
op_id: Option<String>,
#[serde(rename = "type")]
response_type: String,
data: Value,
}
async fn call_once(
write: &mut (impl SinkExt<Message, Error = tungstenite::Error> + Unpin),
read: &mut (impl StreamExt<Item = std::result::Result<Message, tungstenite::Error>> + Unpin),
id: &str,
method: &str,
params: Value,
) -> Result<ResponseEnvelope> {
let req = RequestEnvelope {
id: id.to_string(),
method: method.to_string(),
params,
};
let text = serde_json::to_string(&req).context("serialize request")?;
write
.send(Message::Text(text))
.await
.with_context(|| format!("send request failed (id={id} method={method})"))?;
while let Some(msg) = read.next().await {
let msg = msg.context("read ws message")?;
let text = match msg {
Message::Text(s) => s,
Message::Binary(b) => String::from_utf8_lossy(&b).to_string(),
_ => continue,
};
let env: ResponseEnvelope =
serde_json::from_str(&text).with_context(|| format!("parse response: {text}"))?;
if env.id != id {
eprintln!(
"rx (unmatched): id={} type={} op_id={:?}",
env.id, env.response_type, env.op_id
);
continue;
}
if env.op_id.is_some() {
bail!(
"protocol violation: non-streaming response included op_id: {:?}",
env.op_id
);
}
if env.response_type != "result" && env.response_type != "error" {
bail!(
"unexpected response type for non-streaming call: {}",
env.response_type
);
}
if env.response_type == "error" {
return Err(anyhow!(
"server returned error for {method} (id={id}): {}",
env.data
));
}
return Ok(env);
}
bail!("connection closed before response for id={id} method={method}");
}
#[tokio::main]
async fn main() -> Result<()> {
let url = env::var("MONOCLE_WS_URL").unwrap_or_else(|_| "ws://127.0.0.1:3000/ws".to_string());
eprintln!("connecting: {url}");
let (ws, _resp) = connect_async(&url).await.context("connect websocket")?;
eprintln!("connected");
let (mut write, mut read) = ws.split();
let r = call_once(
&mut write,
&mut read,
"1",
"system.info",
serde_json::json!({}),
)
.await?;
println!("\nsystem.info => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"2",
"time.parse",
serde_json::json!({
"times": ["1700000000", "2024-01-01T00:00:00Z"],
"format": null
}),
)
.await?;
println!("\ntime.parse => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"3",
"country.lookup",
serde_json::json!({ "query": "US" }),
)
.await?;
println!("\ncountry.lookup => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"4",
"ip.lookup",
serde_json::json!({ "ip": "1.1.1.1", "simple": false }),
)
.await?;
println!("\nip.lookup => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"5",
"ip.public",
serde_json::json!({}),
)
.await?;
println!("\nip.public => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"6",
"rpki.validate",
serde_json::json!({ "prefix": "1.1.1.0/24", "asn": 13335 }),
)
.await?;
println!("\nrpki.validate => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"7",
"rpki.roas",
serde_json::json!({
"asn": 13335,
"prefix": null,
"date": null,
"source": "cloudflare"
}),
)
.await?;
println!("\nrpki.roas => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"8",
"rpki.aspas",
serde_json::json!({
"customer_asn": 13335,
"provider_asn": null,
"date": null,
"source": "cloudflare"
}),
)
.await?;
println!("\nrpki.aspas => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"9",
"as2org.search",
serde_json::json!({
"query": "cloudflare",
"asn_only": false,
"name_only": true,
"country_only": false,
"full_country": false,
"full_table": false
}),
)
.await?;
println!("\nas2org.search => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"10",
"as2org.bootstrap",
serde_json::json!({ "force": false }),
)
.await?;
println!("\nas2org.bootstrap => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"11",
"as2rel.search",
serde_json::json!({ "asns": [13335], "sort_by_asn": false, "show_name": true }),
)
.await?;
println!("\nas2rel.search => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"12",
"as2rel.relationship",
serde_json::json!({ "asn1": 13335, "asn2": 174 }),
)
.await?;
println!("\nas2rel.relationship => {:#}", r.data);
match call_once(
&mut write,
&mut read,
"13",
"as2rel.update",
serde_json::json!({ "url": null }),
)
.await
{
Ok(r) => println!("\nas2rel.update (unexpected success) => {:#}", r.data),
Err(e) => println!("\nas2rel.update (expected failure) => {e}"),
}
let r = call_once(
&mut write,
&mut read,
"14",
"pfx2as.lookup",
serde_json::json!({ "prefix": "1.1.1.0/24", "mode": "longest" }),
)
.await?;
println!("\npfx2as.lookup => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"15",
"database.status",
serde_json::json!({}),
)
.await?;
println!("\ndatabase.status => {:#}", r.data);
let r = call_once(
&mut write,
&mut read,
"16",
"database.refresh",
serde_json::json!({ "source": "pfx2as-cache", "force": false }),
)
.await?;
println!("\ndatabase.refresh => {:#}", r.data);
let _ = write.send(Message::Close(None)).await;
Ok(())
}