1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde_json::Value;
6
7use crate::artifact::{cid_of, verify_artifact};
8use crate::error::Error;
9use crate::store::Store;
10
11#[async_trait]
12pub trait Fetcher: Send + Sync {
13 async fn fetch(&self, url: &str) -> Result<FetchResponse, Error>;
14}
15
16pub struct FetchResponse {
17 pub status: u16,
18 pub text: String,
19}
20
21impl FetchResponse {
22 pub fn ok(&self) -> bool {
23 (200..300).contains(&self.status)
24 }
25}
26
27#[derive(Debug, Default)]
28pub struct PullResult {
29 pub count: usize,
30 pub rejected: usize,
31 pub last_seen: Option<String>,
32 pub reasons: Vec<String>,
33}
34
35pub struct HttpFetcher {
36 client: reqwest::Client,
37}
38
39impl HttpFetcher {
40 pub fn new() -> Self {
41 Self { client: reqwest::Client::new() }
42 }
43}
44
45impl Default for HttpFetcher {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51#[async_trait]
52impl Fetcher for HttpFetcher {
53 async fn fetch(&self, url: &str) -> Result<FetchResponse, Error> {
54 let r = self.client.get(url).send().await
55 .map_err(|e| Error::Http(e.to_string()))?;
56 let status = r.status().as_u16();
57 let text = r.text().await.map_err(|e| Error::Http(e.to_string()))?;
58 Ok(FetchResponse { status, text })
59 }
60}
61
62fn parse_iso_z(s: &str) -> Option<i64> {
63 DateTime::parse_from_rfc3339(s).ok().map(|d| d.with_timezone(&Utc).timestamp_millis())
64}
65
66pub async fn pull_from_peer(
67 peer_url: &str,
68 store: &Store,
69 since: Option<&str>,
70 fetcher: &dyn Fetcher,
71 now: DateTime<Utc>,
72) -> Result<PullResult, Error> {
73 let url = match since {
74 Some(s) => format!("{peer_url}/feed?since={}", urlencoding(s)),
75 None => format!("{peer_url}/feed"),
76 };
77 let res = fetcher.fetch(&url).await?;
78 if !res.ok() {
79 return Err(Error::Http(format!("peer {peer_url} returned {}", res.status)));
80 }
81 let now_ms = now.timestamp_millis();
82
83 let mut result = PullResult::default();
84 let mut last_seen: Option<String> = None;
85
86 let lines: Vec<&str> = res.text.split('\n').rev().collect();
88 for line in lines {
89 let trimmed = line.trim();
90 if trimmed.is_empty() {
91 continue;
92 }
93 let raw: Value = match serde_json::from_str(trimmed) {
94 Ok(v) => v,
95 Err(_) => {
96 result.reasons.push("invalid json line".into());
97 continue;
98 }
99 };
100 let v = verify_artifact(&raw);
101 if !v.ok {
102 let first = v.errors.first().cloned().unwrap_or_else(|| "unknown".into());
103 result.reasons.push(format!("verify: {first}"));
104 continue;
105 }
106 let created_at = raw.get("created_at").and_then(|v| v.as_str()).unwrap_or("");
107 let created_ms = match parse_iso_z(created_at) {
108 Some(ms) => ms,
109 None => {
110 result.reasons.push("invalid created_at".into());
111 continue;
112 }
113 };
114 if (now_ms - created_ms).abs() > 24 * 60 * 60 * 1000 {
115 result.reasons.push("created_at outside ±24h window".into());
116 continue;
117 }
118 let kind = raw.get("kind").and_then(|v| v.as_str()).unwrap_or("");
119 if kind == "answer" {
120 let qcid = raw.get("question_cid").and_then(|v| v.as_str()).unwrap_or("");
121 if !store.has_artifact(qcid)? {
122 result.reasons.push("answer references unknown question_cid".into());
123 continue;
124 }
125 }
126 if kind == "rating" {
127 let tcid = raw.get("target_cid").and_then(|v| v.as_str()).unwrap_or("");
128 if !store.has_artifact(tcid)? {
129 result.reasons.push("rating references unknown target_cid".into());
130 continue;
131 }
132 }
133 let ca = created_at.to_string();
134 match &last_seen {
135 None => last_seen = Some(ca.clone()),
136 Some(prev) if ca.as_str() > prev.as_str() => last_seen = Some(ca.clone()),
137 _ => {}
138 }
139 let cid = cid_of(&raw)?;
140 if store.has_artifact(&cid)? {
141 continue;
142 }
143 store.insert_artifact(&raw)?;
144 result.count += 1;
145 }
146 result.last_seen = last_seen;
147 result.rejected = result.reasons.len();
148 Ok(result)
149}
150
151fn urlencoding(s: &str) -> String {
152 let mut out = String::new();
155 for b in s.bytes() {
156 match b {
157 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char),
158 _ => out.push_str(&format!("%{b:02X}")),
159 }
160 }
161 out
162}