use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_json::Value;
use crate::artifact::{cid_of, verify_artifact};
use crate::error::Error;
use crate::store::Store;
#[async_trait]
pub trait Fetcher: Send + Sync {
async fn fetch(&self, url: &str) -> Result<FetchResponse, Error>;
}
pub struct FetchResponse {
pub status: u16,
pub text: String,
}
impl FetchResponse {
pub fn ok(&self) -> bool {
(200..300).contains(&self.status)
}
}
#[derive(Debug, Default)]
pub struct PullResult {
pub count: usize,
pub rejected: usize,
pub last_seen: Option<String>,
pub reasons: Vec<String>,
}
pub struct HttpFetcher {
client: reqwest::Client,
}
impl HttpFetcher {
pub fn new() -> Self {
Self { client: reqwest::Client::new() }
}
}
impl Default for HttpFetcher {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Fetcher for HttpFetcher {
async fn fetch(&self, url: &str) -> Result<FetchResponse, Error> {
let r = self.client.get(url).send().await
.map_err(|e| Error::Http(e.to_string()))?;
let status = r.status().as_u16();
let text = r.text().await.map_err(|e| Error::Http(e.to_string()))?;
Ok(FetchResponse { status, text })
}
}
fn parse_iso_z(s: &str) -> Option<i64> {
DateTime::parse_from_rfc3339(s).ok().map(|d| d.with_timezone(&Utc).timestamp_millis())
}
pub async fn pull_from_peer(
peer_url: &str,
store: &Store,
since: Option<&str>,
fetcher: &dyn Fetcher,
now: DateTime<Utc>,
) -> Result<PullResult, Error> {
let url = match since {
Some(s) => format!("{peer_url}/feed?since={}", urlencoding(s)),
None => format!("{peer_url}/feed"),
};
let res = fetcher.fetch(&url).await?;
if !res.ok() {
return Err(Error::Http(format!("peer {peer_url} returned {}", res.status)));
}
let now_ms = now.timestamp_millis();
let mut result = PullResult::default();
let mut last_seen: Option<String> = None;
let lines: Vec<&str> = res.text.split('\n').rev().collect();
for line in lines {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let raw: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
result.reasons.push("invalid json line".into());
continue;
}
};
let v = verify_artifact(&raw);
if !v.ok {
let first = v.errors.first().cloned().unwrap_or_else(|| "unknown".into());
result.reasons.push(format!("verify: {first}"));
continue;
}
let created_at = raw.get("created_at").and_then(|v| v.as_str()).unwrap_or("");
let created_ms = match parse_iso_z(created_at) {
Some(ms) => ms,
None => {
result.reasons.push("invalid created_at".into());
continue;
}
};
if (now_ms - created_ms).abs() > 24 * 60 * 60 * 1000 {
result.reasons.push("created_at outside ±24h window".into());
continue;
}
let kind = raw.get("kind").and_then(|v| v.as_str()).unwrap_or("");
if kind == "answer" {
let qcid = raw.get("question_cid").and_then(|v| v.as_str()).unwrap_or("");
if !store.has_artifact(qcid)? {
result.reasons.push("answer references unknown question_cid".into());
continue;
}
}
if kind == "rating" {
let tcid = raw.get("target_cid").and_then(|v| v.as_str()).unwrap_or("");
if !store.has_artifact(tcid)? {
result.reasons.push("rating references unknown target_cid".into());
continue;
}
}
let ca = created_at.to_string();
match &last_seen {
None => last_seen = Some(ca.clone()),
Some(prev) if ca.as_str() > prev.as_str() => last_seen = Some(ca.clone()),
_ => {}
}
let cid = cid_of(&raw)?;
if store.has_artifact(&cid)? {
continue;
}
store.insert_artifact(&raw)?;
result.count += 1;
}
result.last_seen = last_seen;
result.rejected = result.reasons.len();
Ok(result)
}
fn urlencoding(s: &str) -> String {
let mut out = String::new();
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char),
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}