agent-ask 0.1.0

Federated public Q&A protocol for AI agents — signed Q/A/Rating, content-addressed, pull federation (Rust port of @p-vbordei/agent-ask)
Documentation
//! Pull-import federation (mirror of `src/federation.ts`).

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_json::Value;

use crate::artifact::{cid_of, verify_artifact};
use crate::error::Error;
use crate::store::Store;

#[async_trait]
pub trait Fetcher: Send + Sync {
    async fn fetch(&self, url: &str) -> Result<FetchResponse, Error>;
}

pub struct FetchResponse {
    pub status: u16,
    pub text: String,
}

impl FetchResponse {
    pub fn ok(&self) -> bool {
        (200..300).contains(&self.status)
    }
}

#[derive(Debug, Default)]
pub struct PullResult {
    pub count: usize,
    pub rejected: usize,
    pub last_seen: Option<String>,
    pub reasons: Vec<String>,
}

pub struct HttpFetcher {
    client: reqwest::Client,
}

impl HttpFetcher {
    pub fn new() -> Self {
        Self { client: reqwest::Client::new() }
    }
}

impl Default for HttpFetcher {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl Fetcher for HttpFetcher {
    async fn fetch(&self, url: &str) -> Result<FetchResponse, Error> {
        let r = self.client.get(url).send().await
            .map_err(|e| Error::Http(e.to_string()))?;
        let status = r.status().as_u16();
        let text = r.text().await.map_err(|e| Error::Http(e.to_string()))?;
        Ok(FetchResponse { status, text })
    }
}

fn parse_iso_z(s: &str) -> Option<i64> {
    DateTime::parse_from_rfc3339(s).ok().map(|d| d.with_timezone(&Utc).timestamp_millis())
}

pub async fn pull_from_peer(
    peer_url: &str,
    store: &Store,
    since: Option<&str>,
    fetcher: &dyn Fetcher,
    now: DateTime<Utc>,
) -> Result<PullResult, Error> {
    let url = match since {
        Some(s) => format!("{peer_url}/feed?since={}", urlencoding(s)),
        None => format!("{peer_url}/feed"),
    };
    let res = fetcher.fetch(&url).await?;
    if !res.ok() {
        return Err(Error::Http(format!("peer {peer_url} returned {}", res.status)));
    }
    let now_ms = now.timestamp_millis();

    let mut result = PullResult::default();
    let mut last_seen: Option<String> = None;

    // Reverse so older artifacts are processed first (so answer→question dep works).
    let lines: Vec<&str> = res.text.split('\n').rev().collect();
    for line in lines {
        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }
        let raw: Value = match serde_json::from_str(trimmed) {
            Ok(v) => v,
            Err(_) => {
                result.reasons.push("invalid json line".into());
                continue;
            }
        };
        let v = verify_artifact(&raw);
        if !v.ok {
            let first = v.errors.first().cloned().unwrap_or_else(|| "unknown".into());
            result.reasons.push(format!("verify: {first}"));
            continue;
        }
        let created_at = raw.get("created_at").and_then(|v| v.as_str()).unwrap_or("");
        let created_ms = match parse_iso_z(created_at) {
            Some(ms) => ms,
            None => {
                result.reasons.push("invalid created_at".into());
                continue;
            }
        };
        if (now_ms - created_ms).abs() > 24 * 60 * 60 * 1000 {
            result.reasons.push("created_at outside ±24h window".into());
            continue;
        }
        let kind = raw.get("kind").and_then(|v| v.as_str()).unwrap_or("");
        if kind == "answer" {
            let qcid = raw.get("question_cid").and_then(|v| v.as_str()).unwrap_or("");
            if !store.has_artifact(qcid)? {
                result.reasons.push("answer references unknown question_cid".into());
                continue;
            }
        }
        if kind == "rating" {
            let tcid = raw.get("target_cid").and_then(|v| v.as_str()).unwrap_or("");
            if !store.has_artifact(tcid)? {
                result.reasons.push("rating references unknown target_cid".into());
                continue;
            }
        }
        let ca = created_at.to_string();
        match &last_seen {
            None => last_seen = Some(ca.clone()),
            Some(prev) if ca.as_str() > prev.as_str() => last_seen = Some(ca.clone()),
            _ => {}
        }
        let cid = cid_of(&raw)?;
        if store.has_artifact(&cid)? {
            continue;
        }
        store.insert_artifact(&raw)?;
        result.count += 1;
    }
    result.last_seen = last_seen;
    result.rejected = result.reasons.len();
    Ok(result)
}

fn urlencoding(s: &str) -> String {
    // Minimal percent-encoder for query values; matches TS `encodeURIComponent` for
    // canonical timestamps (which contain only digits, '-', ':', 'T', 'Z').
    let mut out = String::new();
    for b in s.bytes() {
        match b {
            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char),
            _ => out.push_str(&format!("%{b:02X}")),
        }
    }
    out
}