Skip to main content

paygress/observatory/
aggregator.rs

1// Pure aggregator: turns observed Nostr events into a reproducible
2// JSON snapshot. No I/O, no clock — `now` and stake statuses are
3// inputs, so two invocations with the same inputs produce
4// byte-identical bytes regardless of where they run.
5
6use std::collections::{BTreeMap, HashMap, HashSet};
7
8use serde::{Deserialize, Serialize};
9
10use crate::nostr::{HeartbeatContent, IsolationLevel, PodSpec, ProviderOfferContent};
11use crate::reputation::{score_provider, CompletionReceipt, ConsumerProfile, SybilHeuristics};
12use crate::stake::{stake_rank, StakeStatus};
13
14/// Wire format for the snapshot. Versioned from day one so the
15/// static frontend can branch on schema bumps without breaking old
16/// archives.
17pub const SNAPSHOT_VERSION: u8 = 1;
18
19/// Receipt window: receipts older than this are aged out of the
20/// rolling-window score. 30 days per the plan's R10.
21pub const RECEIPT_WINDOW_SECS: u64 = 30 * 24 * 3600;
22
23/// Inputs to a single aggregator run. The caller (binary entry
24/// point) is responsible for I/O — fetching offers/heartbeats/
25/// receipts from Nostr, fetching stake statuses from Esplora,
26/// loading consumer first-seen times. Once those are in hand, the
27/// snapshot is a pure function of them.
28pub struct AggregatorInput {
29    pub offers: Vec<ProviderOfferContent>,
30    pub heartbeats: Vec<HeartbeatContent>,
31    pub receipts: Vec<CompletionReceipt>,
32    pub consumers: HashMap<String, ConsumerProfile>,
33    /// Pre-computed by the binary entry point against Esplora,
34    /// keyed by `provider_npub`. Letting the caller pass these in
35    /// keeps `compute_snapshot` pure and lets two independent
36    /// runs reproduce as long as they pass the same data.
37    pub stake_statuses: HashMap<String, StakeStatus>,
38    /// 5 anchor providers (Paygress-team-run) flagged in the UI.
39    pub anchor_providers: HashSet<String>,
40}
41
42/// Top-level snapshot the static frontend reads.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct Snapshot {
45    pub version: u8,
46    pub generated_at: u64,
47    /// Receipts older than `now - RECEIPT_WINDOW_SECS` were
48    /// excluded. Stamped here so consumers can verify reproducibility.
49    pub receipt_window_secs: u64,
50    /// Sorted by `npub` for byte-identical reproducibility.
51    pub providers: Vec<ProviderSummary>,
52}
53
54/// One provider's row in the snapshot.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ProviderSummary {
57    pub npub: String,
58    pub hostname: String,
59    /// Coarse jurisdiction. Only present if the offer opted in
60    /// (`location.is_some()`). The plan guarantees no involuntary
61    /// geo surfacing.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub jurisdiction: Option<String>,
64    /// Sybil-resistant score from the receipt set restricted to
65    /// the rolling window. See `paygress::reputation`.
66    pub score: f32,
67    /// Last seen (most recent heartbeat across the input set).
68    pub last_seen_unix: Option<u64>,
69    /// `Some` only if the offer carried a stake proof AND the
70    /// pre-computed `StakeStatus` was `Valid`.
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub stake: Option<StakeSummary>,
73    pub anchor: bool,
74    pub specs: Vec<PodSpec>,
75    pub isolation_level: IsolationLevel,
76}
77
78/// What we render about a provider's stake when it verifies.
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct StakeSummary {
81    pub effective_sats: u64,
82    pub locktime_unix: u64,
83    /// log(sats × locked_seconds), used for staked-tier ordering.
84    pub rank: f64,
85}
86
87/// Compute the snapshot. Pure: no clock, no network, no filesystem.
88pub fn compute_snapshot(input: &AggregatorInput, now: u64) -> Snapshot {
89    let heuristics = SybilHeuristics::default();
90    let receipt_floor = now.saturating_sub(RECEIPT_WINDOW_SECS);
91
92    // Filter receipts to the rolling window once; reused across
93    // every provider.
94    let windowed: Vec<&CompletionReceipt> = input
95        .receipts
96        .iter()
97        .filter(|r| r.completed_at >= receipt_floor)
98        .collect();
99
100    // Most-recent heartbeat per provider, across the input set.
101    let mut last_seen: HashMap<&str, u64> = HashMap::new();
102    for hb in &input.heartbeats {
103        let cur = last_seen.entry(hb.provider_npub.as_str()).or_insert(0);
104        if hb.timestamp > *cur {
105            *cur = hb.timestamp;
106        }
107    }
108
109    // Build per-provider rows. Use a BTreeMap so iteration order is
110    // deterministic by npub — the reproducibility property.
111    let mut by_npub: BTreeMap<&str, &ProviderOfferContent> = BTreeMap::new();
112    for offer in &input.offers {
113        by_npub.insert(offer.provider_npub.as_str(), offer);
114    }
115
116    let mut providers = Vec::with_capacity(by_npub.len());
117    for (npub, offer) in by_npub {
118        // Score: pass closures that always-accept signatures and
119        // payment proofs. Real aggregator wires these to nostr-sdk
120        // Schnorr verification + cdk mint-key checks; for the pure
121        // snapshot path we trust the caller to have pre-filtered
122        // bad receipts (they would be dropped during the
123        // Nostr-crawl step).
124        let receipts_owned: Vec<CompletionReceipt> =
125            windowed.iter().map(|r| (*r).clone()).collect();
126        let score = score_provider(
127            npub,
128            &receipts_owned,
129            &input.consumers,
130            now,
131            &heuristics,
132            |_| true,
133            |_| true,
134        );
135
136        let stake = input.stake_statuses.get(npub).and_then(|s| match s {
137            StakeStatus::Valid {
138                effective_sats,
139                locktime_unix,
140            } => Some(StakeSummary {
141                effective_sats: *effective_sats,
142                locktime_unix: *locktime_unix,
143                rank: stake_rank(*effective_sats, *locktime_unix, now),
144            }),
145            _ => None,
146        });
147
148        providers.push(ProviderSummary {
149            npub: npub.to_string(),
150            hostname: offer.hostname.clone(),
151            jurisdiction: offer.location.clone(),
152            score,
153            last_seen_unix: last_seen.get(npub).copied(),
154            stake,
155            anchor: input.anchor_providers.contains(npub),
156            specs: offer.specs.clone(),
157            isolation_level: offer.isolation_level.clone(),
158        });
159    }
160
161    Snapshot {
162        version: SNAPSHOT_VERSION,
163        generated_at: now,
164        receipt_window_secs: RECEIPT_WINDOW_SECS,
165        providers,
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::nostr::{CapacityInfo, IsolationLevel, SCHEMA_VERSION};
173    use crate::reputation::PaymentProof;
174    use crate::stake::StakeStatus;
175
176    fn offer(npub: &str, hostname: &str, location: Option<&str>) -> ProviderOfferContent {
177        ProviderOfferContent {
178            provider_npub: npub.to_string(),
179            hostname: hostname.to_string(),
180            location: location.map(|s| s.to_string()),
181            capabilities: vec!["lxc".to_string()],
182            specs: vec![],
183            whitelisted_mints: vec!["https://mint.example".to_string()],
184            uptime_percent: 99.0,
185            total_jobs_completed: 5,
186            api_endpoint: None,
187            version: SCHEMA_VERSION,
188            isolation_level: IsolationLevel::SharedKernel,
189            stake_proof: None,
190        }
191    }
192
193    fn heartbeat(npub: &str, ts: u64) -> HeartbeatContent {
194        HeartbeatContent {
195            provider_npub: npub.to_string(),
196            timestamp: ts,
197            active_workloads: 0,
198            available_capacity: CapacityInfo {
199                cpu_available: 0,
200                memory_mb_available: 0,
201                storage_gb_available: 0,
202            },
203            version: SCHEMA_VERSION,
204        }
205    }
206
207    fn receipt(provider: &str, consumer: &str, completed_at: u64) -> CompletionReceipt {
208        CompletionReceipt {
209            lease_id: format!("l-{}-{}-{}", provider, consumer, completed_at),
210            provider_npub: provider.to_string(),
211            consumer_npub: consumer.to_string(),
212            duration_paid: 3600,
213            duration_delivered: 3600,
214            success_flag: 1.0,
215            payment_proof: PaymentProof {
216                mint_url: "https://mint.example".to_string(),
217                swap_response_signature: "sig".to_string(),
218            },
219            version: 1,
220            consumer_signature: Some("c".to_string()),
221            provider_co_signature: Some("p".to_string()),
222            completed_at,
223        }
224    }
225
226    #[test]
227    fn snapshot_is_byte_identical_when_inputs_match() {
228        let now = 1_700_000_000;
229        let input = AggregatorInput {
230            offers: vec![
231                offer("npubB", "host-b", None),
232                offer("npubA", "host-a", Some("BER")),
233            ],
234            heartbeats: vec![heartbeat("npubA", now - 60), heartbeat("npubA", now - 120)],
235            receipts: vec![],
236            consumers: HashMap::new(),
237            stake_statuses: HashMap::new(),
238            anchor_providers: HashSet::new(),
239        };
240        let snap_a = compute_snapshot(&input, now);
241        let snap_b = compute_snapshot(&input, now);
242        let json_a = serde_json::to_string(&snap_a).unwrap();
243        let json_b = serde_json::to_string(&snap_b).unwrap();
244        assert_eq!(json_a, json_b);
245    }
246
247    #[test]
248    fn providers_are_sorted_by_npub_for_reproducibility() {
249        let now = 1_700_000_000;
250        let input = AggregatorInput {
251            offers: vec![
252                offer("npubZ", "z", None),
253                offer("npubA", "a", None),
254                offer("npubM", "m", None),
255            ],
256            heartbeats: vec![],
257            receipts: vec![],
258            consumers: HashMap::new(),
259            stake_statuses: HashMap::new(),
260            anchor_providers: HashSet::new(),
261        };
262        let snap = compute_snapshot(&input, now);
263        let order: Vec<_> = snap.providers.iter().map(|p| p.npub.as_str()).collect();
264        assert_eq!(order, vec!["npubA", "npubM", "npubZ"]);
265    }
266
267    #[test]
268    fn jurisdiction_is_only_emitted_if_offer_opted_in() {
269        let now = 1_700_000_000;
270        let input = AggregatorInput {
271            offers: vec![
272                offer("npubA", "a", Some("BER")),
273                offer("npubB", "b", None), // opted out
274            ],
275            heartbeats: vec![],
276            receipts: vec![],
277            consumers: HashMap::new(),
278            stake_statuses: HashMap::new(),
279            anchor_providers: HashSet::new(),
280        };
281        let snap = compute_snapshot(&input, now);
282        let by: HashMap<_, _> = snap
283            .providers
284            .iter()
285            .map(|p| (p.npub.as_str(), p))
286            .collect();
287        assert_eq!(by["npubA"].jurisdiction.as_deref(), Some("BER"));
288        assert!(by["npubB"].jurisdiction.is_none());
289    }
290
291    #[test]
292    fn old_receipts_are_aged_out_of_window() {
293        let now = 1_700_000_000;
294        let in_window = receipt("P", "C", now - 7 * 24 * 3600); // 7 days ago
295        let out_of_window = receipt("P", "C", now - 60 * 24 * 3600); // 60 days ago
296        let mut consumers = HashMap::new();
297        consumers.insert(
298            "C".to_string(),
299            ConsumerProfile {
300                npub: "C".to_string(),
301                first_seen: now - 365 * 24 * 3600, // very old, passes history gate
302            },
303        );
304        let input = AggregatorInput {
305            offers: vec![offer("P", "p", None)],
306            heartbeats: vec![],
307            receipts: vec![in_window, out_of_window],
308            consumers,
309            stake_statuses: HashMap::new(),
310            anchor_providers: HashSet::new(),
311        };
312        let snap = compute_snapshot(&input, now);
313        // One windowed receipt from one consumer at one provider →
314        // capped to max_share = 0.20 by the Sybil cap.
315        assert!((snap.providers[0].score - 0.20).abs() < 1e-6);
316    }
317
318    #[test]
319    fn anchor_providers_are_flagged() {
320        let now = 1_700_000_000;
321        let mut anchors = HashSet::new();
322        anchors.insert("npubAnchor".to_string());
323        let input = AggregatorInput {
324            offers: vec![
325                offer("npubAnchor", "anchor", None),
326                offer("npubOther", "other", None),
327            ],
328            heartbeats: vec![],
329            receipts: vec![],
330            consumers: HashMap::new(),
331            stake_statuses: HashMap::new(),
332            anchor_providers: anchors,
333        };
334        let snap = compute_snapshot(&input, now);
335        let by: HashMap<_, _> = snap
336            .providers
337            .iter()
338            .map(|p| (p.npub.as_str(), p))
339            .collect();
340        assert!(by["npubAnchor"].anchor);
341        assert!(!by["npubOther"].anchor);
342    }
343
344    #[test]
345    fn stake_summary_only_emitted_when_status_is_valid() {
346        let now = 1_700_000_000;
347        let mut stake_statuses = HashMap::new();
348        stake_statuses.insert(
349            "npubStaked".to_string(),
350            StakeStatus::Valid {
351                effective_sats: 100_000,
352                locktime_unix: now + 30 * 24 * 3600,
353            },
354        );
355        stake_statuses.insert("npubSpent".to_string(), StakeStatus::Spent);
356        let input = AggregatorInput {
357            offers: vec![
358                offer("npubStaked", "s", None),
359                offer("npubSpent", "x", None),
360            ],
361            heartbeats: vec![],
362            receipts: vec![],
363            consumers: HashMap::new(),
364            stake_statuses,
365            anchor_providers: HashSet::new(),
366        };
367        let snap = compute_snapshot(&input, now);
368        let by: HashMap<_, _> = snap
369            .providers
370            .iter()
371            .map(|p| (p.npub.as_str(), p))
372            .collect();
373        assert!(by["npubStaked"].stake.is_some());
374        assert!(by["npubSpent"].stake.is_none());
375        // Stake rank > 0 for a valid lock.
376        assert!(by["npubStaked"].stake.as_ref().unwrap().rank > 0.0);
377    }
378
379    #[test]
380    fn last_seen_picks_max_timestamp() {
381        let now = 1_700_000_000;
382        let input = AggregatorInput {
383            offers: vec![offer("P", "p", None)],
384            heartbeats: vec![
385                heartbeat("P", now - 600),
386                heartbeat("P", now - 60),
387                heartbeat("P", now - 300),
388            ],
389            receipts: vec![],
390            consumers: HashMap::new(),
391            stake_statuses: HashMap::new(),
392            anchor_providers: HashSet::new(),
393        };
394        let snap = compute_snapshot(&input, now);
395        assert_eq!(snap.providers[0].last_seen_unix, Some(now - 60));
396    }
397
398    #[test]
399    fn empty_input_yields_empty_provider_list() {
400        let now = 1_700_000_000;
401        let input = AggregatorInput {
402            offers: vec![],
403            heartbeats: vec![],
404            receipts: vec![],
405            consumers: HashMap::new(),
406            stake_statuses: HashMap::new(),
407            anchor_providers: HashSet::new(),
408        };
409        let snap = compute_snapshot(&input, now);
410        assert_eq!(snap.providers.len(), 0);
411        assert_eq!(snap.version, SNAPSHOT_VERSION);
412        assert_eq!(snap.receipt_window_secs, RECEIPT_WINDOW_SECS);
413    }
414}