agent-ask 0.1.0

Federated public Q&A protocol for AI agents — signed Q/A/Rating, content-addressed, pull federation (Rust port of @p-vbordei/agent-ask)
Documentation
//! Mirror of `tests/federation.test.ts`.

use async_trait::async_trait;
use std::sync::{Arc, Mutex};

use agent_ask::{
    build_answer, build_question, cid_of, create_app, generate_keypair, pull_from_peer, AppState,
    BuildAnswerOpts, BuildQuestionOpts, FetchResponse, Fetcher, Store,
};
use axum::body::Body;
use axum::http::Request;
use chrono::Utc;
use http_body_util::BodyExt;
use serde_json::{json, Value};
use tower::util::ServiceExt;

/// Adapter that proxies "http://peer/..." through an in-memory `axum::Router`.
struct RouterFetcher {
    app: Arc<Mutex<Option<axum::Router>>>,
}

impl RouterFetcher {
    fn new(app: axum::Router) -> Self {
        Self { app: Arc::new(Mutex::new(Some(app))) }
    }
}

#[async_trait]
impl Fetcher for RouterFetcher {
    async fn fetch(&self, url: &str) -> Result<FetchResponse, agent_ask::Error> {
        let path = url.strip_prefix("http://peer").unwrap_or(url).to_string();
        // `oneshot` consumes the router, so we keep a clone.
        let router = {
            let g = self.app.lock().unwrap();
            g.as_ref().unwrap().clone()
        };
        let req = Request::builder().method("GET").uri(&path).body(Body::empty()).unwrap();
        let res = router.oneshot(req).await.unwrap();
        let status = res.status().as_u16();
        let body = res.into_body().collect().await.unwrap().to_bytes();
        let text = String::from_utf8(body.to_vec()).unwrap_or_default();
        Ok(FetchResponse { status, text })
    }
}

struct TextFetcher {
    text: String,
}

#[async_trait]
impl Fetcher for TextFetcher {
    async fn fetch(&self, _url: &str) -> Result<FetchResponse, agent_ask::Error> {
        Ok(FetchResponse { status: 200, text: self.text.clone() })
    }
}

async fn post_json(app: axum::Router, path: &str, body: &Value) {
    let bytes = serde_json::to_vec(body).unwrap();
    let req = Request::builder().method("POST").uri(path)
        .header("content-type", "application/json")
        .header("content-length", bytes.len().to_string())
        .body(Body::from(bytes)).unwrap();
    let _ = app.oneshot(req).await.unwrap();
}

#[tokio::test]
async fn pull_ingests_all() {
    let peer_store = Store::open(":memory:").unwrap();
    let local_store = Store::open(":memory:").unwrap();
    let peer_app = create_app(AppState::new(peer_store.clone()));

    let kp = generate_keypair();
    let q = build_question(&kp, BuildQuestionOpts {
        title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
    }).unwrap();
    post_json(peer_app.clone(), "/questions", &q).await;
    let q_cid = cid_of(&q).unwrap();
    let a = build_answer(&kp, BuildAnswerOpts {
        question_cid: q_cid.clone(), body: "ans".into(), ..Default::default()
    }).unwrap();
    let a_cid = cid_of(&a).unwrap();
    post_json(peer_app.clone(), "/answers", &a).await;

    let fetcher = RouterFetcher::new(peer_app);
    let res = pull_from_peer(
        "http://peer", &local_store, None, &fetcher, Utc::now(),
    ).await.unwrap();
    assert_eq!(res.count, 2);
    assert_eq!(res.last_seen.as_deref(), Some(a["created_at"].as_str().unwrap()));
    assert_eq!(local_store.get_artifact(&q_cid).unwrap().as_ref(), Some(&q));
    assert_eq!(local_store.get_artifact(&a_cid).unwrap().as_ref(), Some(&a));
}

#[tokio::test]
async fn pull_idempotent() {
    let peer_store = Store::open(":memory:").unwrap();
    let local_store = Store::open(":memory:").unwrap();
    let peer_app = create_app(AppState::new(peer_store.clone()));
    let kp = generate_keypair();
    let q = build_question(&kp, BuildQuestionOpts {
        title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
    }).unwrap();
    post_json(peer_app.clone(), "/questions", &q).await;
    let fetcher = RouterFetcher::new(peer_app);
    let first = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
    let second = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
    assert_eq!(first.count, 1);
    assert_eq!(second.count, 0);
}

#[tokio::test]
async fn pull_discards_invalid() {
    let local_store = Store::open(":memory:").unwrap();
    let kp = generate_keypair();
    let q = build_question(&kp, BuildQuestionOpts {
        title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
    }).unwrap();
    let mut tampered = q.clone();
    tampered["body"] = json!("MUTATED");
    let feed = format!("{}\n", serde_json::to_string(&tampered).unwrap());
    let fetcher = TextFetcher { text: feed };
    let res = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
    assert_eq!(res.count, 0);
    assert_eq!(res.rejected, 1);
    assert_eq!(res.reasons.len(), 1);
    assert!(res.reasons[0].contains("verify"));
    assert!(!local_store.has_artifact(&cid_of(&q).unwrap()).unwrap());
}

#[tokio::test]
async fn pull_advances_lastseen_on_duplicate() {
    let peer_store = Store::open(":memory:").unwrap();
    let local_store = Store::open(":memory:").unwrap();
    let peer_app = create_app(AppState::new(peer_store.clone()));
    let kp = generate_keypair();
    let q = build_question(&kp, BuildQuestionOpts {
        title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
    }).unwrap();
    post_json(peer_app.clone(), "/questions", &q).await;
    let fetcher = RouterFetcher::new(peer_app);
    let first = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
    let second = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
    assert_eq!(first.last_seen.as_deref(), Some(q["created_at"].as_str().unwrap()));
    assert_eq!(second.count, 0);
    assert_eq!(second.last_seen.as_deref(), Some(q["created_at"].as_str().unwrap()));
}

#[tokio::test]
async fn pull_reasons_enumerate() {
    let local_store = Store::open(":memory:").unwrap();
    let kp = generate_keypair();
    let q = build_question(&kp, BuildQuestionOpts {
        title: "t".into(), body: "b".into(), tags: vec![], ..Default::default()
    }).unwrap();
    let mut tampered = q.clone();
    tampered["body"] = json!("X");
    let feed = format!("{{not json}}\n{}", serde_json::to_string(&tampered).unwrap());
    let fetcher = TextFetcher { text: feed };
    let res = pull_from_peer("http://peer", &local_store, None, &fetcher, Utc::now()).await.unwrap();
    assert_eq!(res.rejected, 2);
    assert!(res.reasons.iter().any(|r| r == "invalid json line"));
    assert!(res.reasons.iter().any(|r| r.starts_with("verify:")));
}