use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use agent_ask::{
build_answer, build_question, cid_of, create_app, generate_keypair, pull_from_peer, AppState,
BuildAnswerOpts, BuildQuestionOpts, FetchResponse, Fetcher, Store,
};
use axum::body::Body;
use axum::http::Request;
use chrono::Utc;
use http_body_util::BodyExt;
use serde_json::{json, Value};
use tower::util::ServiceExt;
struct RouterFetcher {
app: Arc<Mutex<Option<axum::Router>>>,
}
impl RouterFetcher {
fn new(app: axum::Router) -> Self {
Self { app: Arc::new(Mutex::new(Some(app))) }
}
}
#[async_trait]
impl Fetcher for RouterFetcher {
async fn fetch(&self, url: &str) -> Result<FetchResponse, agent_ask::Error> {
let path = url.strip_prefix("http://peer").unwrap_or(url).to_string();
let router = {
let g = self.app.lock().unwrap();
g.as_ref().unwrap().clone()
};
let req = Request::builder().method("GET").uri(&path).body(Body::empty()).unwrap();
let res = router.oneshot(req).await.unwrap();
let status = res.status().as_u16();
let body = res.into_body().collect().await.unwrap().to_bytes();
let text = String::from_utf8(body.to_vec()).unwrap_or_default();
Ok(FetchResponse { status, text })
}
}
struct TextFetcher {
text: String,
}
#[async_trait]
impl Fetcher for TextFetcher {
async fn fetch(&self, _url: &str) -> Result<FetchResponse, agent_ask::Error> {
Ok(FetchResponse { status: 200, text: self.text.clone() })
}
}
async fn post_json(app: axum::Router, path: &str, body: &Value) {
let bytes = serde_json::to_vec(body).unwrap();
let req = Request::builder().method("POST").uri(path)
.header("content-type", "application/json")
.header("content-length", bytes.len().to_string())
.body(Body::from(bytes)).unwrap();
let _ = app.oneshot(req).await.unwrap();
}
#[tokio::test]
async fn pull_ingests_all() {
let peer_store = Store::open(":memory:").unwrap();
let local_store = Store::open(":memory:").unwrap();
let peer_app = create_app(AppState::new(peer_store.clone()));
let kp = generate_keypair();
let q = build_question(&kp, BuildQuestionOpts {
title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
}).unwrap();
post_json(peer_app.clone(), "/questions", &q).await;
let q_cid = cid_of(&q).unwrap();
let a = build_answer(&kp, BuildAnswerOpts {
question_cid: q_cid.clone(), body: "ans".into(), ..Default::default()
}).unwrap();
let a_cid = cid_of(&a).unwrap();
post_json(peer_app.clone(), "/answers", &a).await;
let fetcher = RouterFetcher::new(peer_app);
let res = pull_from_peer(
"http://peer", &local_store, None, &fetcher, Utc::now(),
).await.unwrap();
assert_eq!(res.count, 2);
assert_eq!(res.last_seen.as_deref(), Some(a["created_at"].as_str().unwrap()));
assert_eq!(local_store.get_artifact(&q_cid).unwrap().as_ref(), Some(&q));
assert_eq!(local_store.get_artifact(&a_cid).unwrap().as_ref(), Some(&a));
}
#[tokio::test]
async fn pull_idempotent() {
let peer_store = Store::open(":memory:").unwrap();
let local_store = Store::open(":memory:").unwrap();
let peer_app = create_app(AppState::new(peer_store.clone()));
let kp = generate_keypair();
let q = build_question(&kp, BuildQuestionOpts {
title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
}).unwrap();
post_json(peer_app.clone(), "/questions", &q).await;
let fetcher = RouterFetcher::new(peer_app);
let first = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
let second = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
assert_eq!(first.count, 1);
assert_eq!(second.count, 0);
}
#[tokio::test]
async fn pull_discards_invalid() {
let local_store = Store::open(":memory:").unwrap();
let kp = generate_keypair();
let q = build_question(&kp, BuildQuestionOpts {
title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
}).unwrap();
let mut tampered = q.clone();
tampered["body"] = json!("MUTATED");
let feed = format!("{}\n", serde_json::to_string(&tampered).unwrap());
let fetcher = TextFetcher { text: feed };
let res = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
assert_eq!(res.count, 0);
assert_eq!(res.rejected, 1);
assert_eq!(res.reasons.len(), 1);
assert!(res.reasons[0].contains("verify"));
assert!(!local_store.has_artifact(&cid_of(&q).unwrap()).unwrap());
}
#[tokio::test]
async fn pull_advances_lastseen_on_duplicate() {
let peer_store = Store::open(":memory:").unwrap();
let local_store = Store::open(":memory:").unwrap();
let peer_app = create_app(AppState::new(peer_store.clone()));
let kp = generate_keypair();
let q = build_question(&kp, BuildQuestionOpts {
title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
}).unwrap();
post_json(peer_app.clone(), "/questions", &q).await;
let fetcher = RouterFetcher::new(peer_app);
let first = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
let second = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
assert_eq!(first.last_seen.as_deref(), Some(q["created_at"].as_str().unwrap()));
assert_eq!(second.count, 0);
assert_eq!(second.last_seen.as_deref(), Some(q["created_at"].as_str().unwrap()));
}
#[tokio::test]
async fn pull_reasons_enumerate() {
let local_store = Store::open(":memory:").unwrap();
let kp = generate_keypair();
let q = build_question(&kp, BuildQuestionOpts {
title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
}).unwrap();
let mut tampered = q.clone();
tampered["body"] = json!("X");
let feed = format!("{{not json}}\n{}", serde_json::to_string(&tampered).unwrap());
let fetcher = TextFetcher { text: feed };
let res = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
assert_eq!(res.rejected, 2);
assert!(res.reasons.iter().any(|r| r == "invalid json line"));
assert!(res.reasons.iter().any(|r| r.starts_with("verify:")));
}