Skip to main content

agent_ask/
federation.rs

1//! Pull-import federation (mirror of `src/federation.ts`).
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde_json::Value;
6
7use crate::artifact::{cid_of, verify_artifact};
8use crate::error::Error;
9use crate::store::Store;
10
11#[async_trait]
12pub trait Fetcher: Send + Sync {
13    async fn fetch(&self, url: &str) -> Result<FetchResponse, Error>;
14}
15
16pub struct FetchResponse {
17    pub status: u16,
18    pub text: String,
19}
20
21impl FetchResponse {
22    pub fn ok(&self) -> bool {
23        (200..300).contains(&self.status)
24    }
25}
26
27#[derive(Debug, Default)]
28pub struct PullResult {
29    pub count: usize,
30    pub rejected: usize,
31    pub last_seen: Option<String>,
32    pub reasons: Vec<String>,
33}
34
35pub struct HttpFetcher {
36    client: reqwest::Client,
37}
38
39impl HttpFetcher {
40    pub fn new() -> Self {
41        Self { client: reqwest::Client::new() }
42    }
43}
44
45impl Default for HttpFetcher {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51#[async_trait]
52impl Fetcher for HttpFetcher {
53    async fn fetch(&self, url: &str) -> Result<FetchResponse, Error> {
54        let r = self.client.get(url).send().await
55            .map_err(|e| Error::Http(e.to_string()))?;
56        let status = r.status().as_u16();
57        let text = r.text().await.map_err(|e| Error::Http(e.to_string()))?;
58        Ok(FetchResponse { status, text })
59    }
60}
61
62fn parse_iso_z(s: &str) -> Option<i64> {
63    DateTime::parse_from_rfc3339(s).ok().map(|d| d.with_timezone(&Utc).timestamp_millis())
64}
65
66pub async fn pull_from_peer(
67    peer_url: &str,
68    store: &Store,
69    since: Option<&str>,
70    fetcher: &dyn Fetcher,
71    now: DateTime<Utc>,
72) -> Result<PullResult, Error> {
73    let url = match since {
74        Some(s) => format!("{peer_url}/feed?since={}", urlencoding(s)),
75        None => format!("{peer_url}/feed"),
76    };
77    let res = fetcher.fetch(&url).await?;
78    if !res.ok() {
79        return Err(Error::Http(format!("peer {peer_url} returned {}", res.status)));
80    }
81    let now_ms = now.timestamp_millis();
82
83    let mut result = PullResult::default();
84    let mut last_seen: Option<String> = None;
85
86    // Reverse so older artifacts are processed first (so answer→question dep works).
87    let lines: Vec<&str> = res.text.split('\n').rev().collect();
88    for line in lines {
89        let trimmed = line.trim();
90        if trimmed.is_empty() {
91            continue;
92        }
93        let raw: Value = match serde_json::from_str(trimmed) {
94            Ok(v) => v,
95            Err(_) => {
96                result.reasons.push("invalid json line".into());
97                continue;
98            }
99        };
100        let v = verify_artifact(&raw);
101        if !v.ok {
102            let first = v.errors.first().cloned().unwrap_or_else(|| "unknown".into());
103            result.reasons.push(format!("verify: {first}"));
104            continue;
105        }
106        let created_at = raw.get("created_at").and_then(|v| v.as_str()).unwrap_or("");
107        let created_ms = match parse_iso_z(created_at) {
108            Some(ms) => ms,
109            None => {
110                result.reasons.push("invalid created_at".into());
111                continue;
112            }
113        };
114        if (now_ms - created_ms).abs() > 24 * 60 * 60 * 1000 {
115            result.reasons.push("created_at outside ±24h window".into());
116            continue;
117        }
118        let kind = raw.get("kind").and_then(|v| v.as_str()).unwrap_or("");
119        if kind == "answer" {
120            let qcid = raw.get("question_cid").and_then(|v| v.as_str()).unwrap_or("");
121            if !store.has_artifact(qcid)? {
122                result.reasons.push("answer references unknown question_cid".into());
123                continue;
124            }
125        }
126        if kind == "rating" {
127            let tcid = raw.get("target_cid").and_then(|v| v.as_str()).unwrap_or("");
128            if !store.has_artifact(tcid)? {
129                result.reasons.push("rating references unknown target_cid".into());
130                continue;
131            }
132        }
133        let ca = created_at.to_string();
134        match &last_seen {
135            None => last_seen = Some(ca.clone()),
136            Some(prev) if ca.as_str() > prev.as_str() => last_seen = Some(ca.clone()),
137            _ => {}
138        }
139        let cid = cid_of(&raw)?;
140        if store.has_artifact(&cid)? {
141            continue;
142        }
143        store.insert_artifact(&raw)?;
144        result.count += 1;
145    }
146    result.last_seen = last_seen;
147    result.rejected = result.reasons.len();
148    Ok(result)
149}
150
151fn urlencoding(s: &str) -> String {
152    // Minimal percent-encoder for query values; matches TS `encodeURIComponent` for
153    // canonical timestamps (which contain only digits, '-', ':', 'T', 'Z').
154    let mut out = String::new();
155    for b in s.bytes() {
156        match b {
157            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char),
158            _ => out.push_str(&format!("%{b:02X}")),
159        }
160    }
161    out
162}