Skip to main content

paygress/
nostr.rs

1// Nostr client for receiving pod provisioning events with private messaging
2use anyhow::{Context, Result};
3use nostr_sdk::nips::nip04;
4use nostr_sdk::nips::nip59::UnwrappedGift;
5use nostr_sdk::{
6    Client, EventBuilder, Filter, Keys, Kind, RelayPoolNotification, Tag, Timestamp, ToBech32,
7};
8use serde::{Deserialize, Serialize};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15// Custom event kinds for Paygress provider discovery.
16//
17// `KIND_PROVIDER_OFFER` (38383) is a NIP-33 parameterized-replaceable
18// event keyed by `(pubkey, kind, d-tag)`. We use a versioned `d` tag
19// (`paygress:offer:v1:<npub>`) so future schema bumps can coexist on
20// the same relay without overwriting v1 events.
21//
22// Heartbeats use TWO kinds (Unit 4 of the 12-month plan):
23// - `KIND_PROVIDER_HEARTBEAT` (38384): NIP-33 addressable. We publish
24//   with a *bucketed* `d` tag
25//   (`paygress:heartbeat:v1:<npub>:<minute-bucket>`) so distinct
26//   minutes coexist on the relay and stored history is queryable for
27//   uptime aggregation. This fixes the original "addressable replaces
28//   each heartbeat" bug where `calculate_uptime` saw only one event.
29// - `KIND_PROVIDER_HEARTBEAT_EPHEMERAL` (20384): NIP-16 ephemeral.
30//   Relays do not store these, so they're cheap for live-presence
31//   subscribers but useless for uptime history. We publish on both.
32pub const KIND_PROVIDER_OFFER: u16 = 38383;
33pub const KIND_PROVIDER_HEARTBEAT: u16 = 38384;
34pub const KIND_PROVIDER_HEARTBEAT_EPHEMERAL: u16 = 20384;
35/// Lease revocation event (Unit 5 wiring). Published by a primary
36/// provider when its workload state machine emits
37/// `PublishLeaseRevocation` — i.e. the local state has left `Live`
38/// for a `WarmStandby` workload. Addressable so a standby that came
39/// online after the publish can still find it on cold start. The
40/// `d` tag is `paygress:revocation:v1:<primary_npub>:<workload_id>`
41/// and each standby is added as a `#p` tag for filterable subscriptions.
42pub const KIND_LEASE_REVOCATION: u16 = 38385;
43
44/// Published by a standby provider IMMEDIATELY after it finishes
45/// promoting a workload to local primary. Other standbys for the
46/// same workload check for one of these events before claiming
47/// their own slot, so only the lowest-indexed standby that wins
48/// the promotion race actually spawns a container — the rest see
49/// the announcement and drop their slot. NIP-33 addressable: the
50/// `d` tag is `paygress:promoted:v1:<workload_id>`, so subsequent
51/// announcements for the same workload (e.g. a re-failover) replace
52/// the previous one rather than accumulating.
53pub const KIND_STANDBY_PROMOTION_ANNOUNCEMENT: u16 = 38386;
54
55/// Schema version for offer + heartbeat payloads. Old payloads
56/// without this field deserialize to `1` via `#[serde(default)]`.
57pub const SCHEMA_VERSION: u8 = 1;
58
59/// Live-presence query window. Ephemeral heartbeats are not stored
60/// at relays, so any "is this provider alive right now?" query is
61/// implicitly bounded to whatever subscribers were live recently.
62/// Stored heartbeats can be queried over arbitrary windows; this
63/// constant only governs the ephemeral / fast-path lookups.
64pub const LIVE_HEARTBEAT_WINDOW_SECS: u64 = 300;
65
66/// Heartbeat bucket size for the addressable (stored) kind. One
67/// bucket per minute matches the 60s heartbeat cadence: every
68/// heartbeat lands in its own `(npub, kind, d-tag)` slot, so relays
69/// preserve history for uptime aggregation.
70pub const HEARTBEAT_BUCKET_SECS: u64 = 60;
71#[derive(Clone, Debug)]
72pub struct RelayConfig {
73    pub relays: Vec<String>,
74    pub private_key: Option<String>,
75}
76
77#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct NostrEvent {
79    pub id: String,
80    pub pubkey: String,
81    pub created_at: u64,
82    pub kind: u32,
83    pub tags: Vec<Vec<String>>,
84    pub content: String,
85    pub sig: String,
86    pub message_type: String, // "nip04" or "nip17" to track which method was used
87}
88
89#[derive(Clone)]
90pub struct NostrRelaySubscriber {
91    client: Client,
92    keys: Keys,
93    // config field removed - not used in current implementation
94}
95
96impl NostrRelaySubscriber {
97    pub async fn new(config: RelayConfig) -> Result<Self> {
98        let keys = match &config.private_key {
99            Some(private_key_hex) if !private_key_hex.is_empty() => {
100                // Parse as nsec format (nostr private key)
101                if private_key_hex.starts_with("nsec1") {
102                    Keys::parse(private_key_hex).context("Invalid nsec private key format")?
103                } else {
104                    // Assume hex format for backward compatibility
105                    Keys::parse(private_key_hex).context("Invalid private key format")?
106                }
107            }
108            _ => {
109                // Generate a new key if none provided
110                Keys::generate()
111            }
112        };
113
114        let client = Client::new(keys.clone());
115
116        // Add relays
117        for relay_url in &config.relays {
118            info!("Adding relay: {}", relay_url);
119            client
120                .add_relay(relay_url)
121                .await
122                .with_context(|| format!("Invalid relay URL: {}", relay_url))?;
123        }
124
125        info!("Connecting to {} relays...", config.relays.len());
126        client.connect().await;
127
128        // Wait a moment for connections to establish
129        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
130
131        info!("Connected to {} relays", config.relays.len());
132        info!(
133            "Service public key (npub): {}",
134            keys.public_key().to_bech32().unwrap()
135        );
136
137        Ok(Self { client, keys })
138    }
139
140    pub fn public_key(&self) -> nostr_sdk::PublicKey {
141        self.keys.public_key()
142    }
143
144    pub async fn subscribe_to_pod_events<F>(&self, handler: F) -> Result<()>
145    where
146        F: Fn(NostrEvent) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
147            + Send
148            + Sync
149            + 'static,
150    {
151        // Subscribe to messages sent TO us (filter by p-tag)
152        let nip04_filter = Filter::new()
153            .kind(Kind::EncryptedDirectMessage)
154            .pubkeys(vec![self.keys.public_key()]) // Sets #p tag
155            .limit(0);
156
157        let nip17_filter = Filter::new()
158            .kind(Kind::GiftWrap)
159            .pubkeys(vec![self.keys.public_key()]) // Sets #p tag
160            .limit(0);
161
162        // Lease revocation events (Unit 5 — standby-side promotion).
163        // Public events (no encryption); a primary publishes them
164        // addressed to the standby_providers via #p tags. The standby's
165        // listener filters by its own pubkey to receive only events
166        // addressed to it.
167        let revocation_filter = Filter::new()
168            .kind(Kind::Custom(KIND_LEASE_REVOCATION))
169            .pubkeys(vec![self.keys.public_key()])
170            .limit(0);
171
172        let _ = self.client.subscribe(nip04_filter, None).await;
173        let _ = self.client.subscribe(nip17_filter, None).await;
174        let _ = self.client.subscribe(revocation_filter, None).await;
175        info!("Subscribed to NIP-04 / NIP-17 messages and KIND_LEASE_REVOCATION events addressed to this provider");
176
177        // Handle incoming events
178        self.client.handle_notifications(|notification| async {
179            if let RelayPoolNotification::Event { relay_url: _, subscription_id: _, event } = notification {
180                match event.kind {
181                    Kind::GiftWrap => {
182                        info!("Received NIP-17 Gift Wrap message: {}", event.id);
183
184                        // Unwrap the Gift Wrap to get the inner message
185                        match self.client.unwrap_gift_wrap(&event).await {
186                            Ok(UnwrappedGift { rumor, sender }) => {
187                                info!("Unwrapped Gift Wrap from sender: {}, rumor kind: {}", sender, rumor.kind);
188
189                                // Check if the rumor is a private direct message
190                                if rumor.kind == Kind::PrivateDirectMessage {
191                                    debug!("NIP-17 rumor is PrivateDirectMessage. Content length: {}", rumor.content.len());
192
193                                    // Create a NostrEvent from the unwrapped rumor with NIP-17 flag
194                                    let nostr_event = NostrEvent {
195                                        id: rumor.id.map(|id| id.to_hex()).unwrap_or_else(|| "unknown".to_string()),
196                                        pubkey: rumor.pubkey.to_hex(),
197                                        created_at: rumor.created_at.as_u64(),
198                                        kind: rumor.kind.as_u16() as u32,
199                                        tags: rumor.tags.iter().map(|tag| {
200                                            tag.as_slice().iter().map(|s| s.to_string()).collect()
201                                        }).collect(),
202                                        content: rumor.content,
203                                        sig: "unsigned".to_string(), // UnsignedEvent doesn't have a signature
204                                        message_type: "nip17".to_string(), // Flag to indicate NIP-17
205                                    };
206
207                                    match handler(nostr_event).await {
208                                        Ok(()) => {
209                                            info!("Successfully processed NIP-17 private message: {}", event.id);
210                                        }
211                                        Err(e) => {
212                                            error!("Failed to process NIP-17 private message {}: {}", event.id, e);
213                                        }
214                                    }
215                                } else {
216                                    info!("Rumor is not a private direct message, kind: {}", rumor.kind);
217                                }
218                            }
219                            Err(e) => {
220                                error!("Failed to unwrap Gift Wrap {}: {}", event.id, e);
221                            }
222                        }
223                    }
224                    Kind::EncryptedDirectMessage => {
225                        info!("Received NIP-04 Encrypted Direct Message: {}", event.id);
226
227                        let secret_key = self.keys.secret_key();
228                        match nip04::decrypt(secret_key, &event.pubkey, &event.content) {
229                            Ok(decrypted_content) => {
230                                debug!(
231                                    "Decrypted NIP-04 message. Length: {}",
232                                    decrypted_content.len()
233                                );
234
235                                let nostr_event = NostrEvent {
236                                    id: event.id.to_hex(),
237                                    pubkey: event.pubkey.to_hex(),
238                                    created_at: event.created_at.as_u64(),
239                                    kind: event.kind.as_u16() as u32,
240                                    tags: event
241                                        .tags
242                                        .iter()
243                                        .map(|tag| {
244                                            tag.as_slice()
245                                                .iter()
246                                                .map(|s| s.to_string())
247                                                .collect()
248                                        })
249                                        .collect(),
250                                    content: decrypted_content,
251                                    sig: event.sig.to_string(),
252                                    message_type: "nip04".to_string(),
253                                };
254
255                                match handler(nostr_event).await {
256                                    Ok(()) => info!(
257                                        "Successfully processed NIP-04 private message: {}",
258                                        event.id
259                                    ),
260                                    Err(e) => error!(
261                                        "Failed to process NIP-04 private message {}: {}",
262                                        event.id, e
263                                    ),
264                                }
265                            }
266                            Err(e) => {
267                                error!(
268                                    "Failed to decrypt NIP-04 message {}: {}",
269                                    event.id, e
270                                );
271                            }
272                        }
273                    }
274                    Kind::Custom(k) if k == KIND_LEASE_REVOCATION => {
275                        // Lease revocation events are public — no
276                        // decryption. Build a NostrEvent with the
277                        // raw content; the handler dispatches by
278                        // kind and parses with parse_revocation_event.
279                        info!("Received lease revocation event: {}", event.id);
280                        let nostr_event = NostrEvent {
281                            id: event.id.to_hex(),
282                            pubkey: event.pubkey.to_hex(),
283                            created_at: event.created_at.as_u64(),
284                            kind: event.kind.as_u16() as u32,
285                            tags: event
286                                .tags
287                                .iter()
288                                .map(|tag| {
289                                    tag.as_slice().iter().map(|s| s.to_string()).collect()
290                                })
291                                .collect(),
292                            content: event.content.clone(),
293                            sig: event.sig.to_string(),
294                            message_type: "lease_revocation".to_string(),
295                        };
296                        if let Err(e) = handler(nostr_event).await {
297                            error!("Failed to process lease revocation {}: {}", event.id, e);
298                        }
299                    }
300                    _ => {
301                        info!("Received unsupported event kind: {}", event.kind);
302                    }
303                }
304            }
305            Ok(false) // Continue listening
306        }).await?;
307
308        Ok(())
309    }
310
311    pub async fn publish_offer(&self, offer: OfferEventContent) -> Result<String> {
312        let content = serde_json::to_string(&offer)?;
313        info!("Publishing offer event with content: {}", content);
314
315        let tags = vec![Tag::hashtag("paygress"), Tag::hashtag("offer")];
316
317        info!("Creating event with kind 999 and {} tags", tags.len());
318        let event = EventBuilder::new(Kind::Custom(999), content)
319            .tags(tags)
320            .sign_with_keys(&self.keys)?;
321        let event_id = event.id.to_hex();
322
323        info!("Event created with ID: {}", event_id);
324        info!("Sending offer event to relays: {}", event_id);
325
326        match self.client.send_event(&event).await {
327            Ok(res) => {
328                info!(
329                    "✅ Successfully published offer event: {} and {:?}",
330                    event_id, res
331                );
332                Ok(event_id)
333            }
334            Err(e) => {
335                error!("❌ Failed to send offer event: {}", e);
336                Err(e.into())
337            }
338        }
339    }
340
341    // Generic method to send an encrypted private message (supports both NIP-04 and NIP-17)
342    pub async fn send_encrypted_private_message(
343        &self,
344        receiver_pubkey: &str,
345        content: String,
346        message_type: &str,
347    ) -> Result<String> {
348        let receiver_pubkey_parsed = nostr_sdk::PublicKey::parse(receiver_pubkey)?;
349
350        match message_type {
351            "nip04" => {
352                let secret_key = self.keys.secret_key();
353                let encrypted_content =
354                    nip04::encrypt(secret_key, &receiver_pubkey_parsed, &content)?;
355                let receiver_tag = Tag::public_key(receiver_pubkey_parsed);
356                let alt_tag = Tag::parse(["alt", "Private Message"])?;
357
358                let event = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted_content)
359                    .tags([receiver_tag, alt_tag])
360                    .sign_with_keys(&self.keys)?;
361                let event_id = self.client.send_event(&event).await?;
362                info!("Sent NIP-04 message to {}: {:?}", receiver_pubkey, event_id);
363                Ok(event_id.val.to_hex())
364            }
365            "nip17" | _ => {
366                // Default to NIP-17 if not specified or nip17
367                let event_id = self
368                    .client
369                    .send_private_msg(receiver_pubkey_parsed, content, [])
370                    .await?;
371                info!("Sent NIP-17 message to {}: {:?}", receiver_pubkey, event_id);
372                Ok(event_id.val.to_hex())
373            }
374        }
375    }
376
377    // Send access details via private encrypted message
378    pub async fn send_access_details_private_message(
379        &self,
380        request_pubkey: &str,
381        details: AccessDetailsContent,
382        message_type: &str,
383    ) -> Result<String> {
384        let details_json = serde_json::to_string(&details)?;
385        self.send_encrypted_private_message(request_pubkey, details_json, message_type)
386            .await
387    }
388
389    // Send status response via private encrypted message
390    pub async fn send_status_response(
391        &self,
392        request_pubkey: &str,
393        response: StatusResponseContent,
394        message_type: &str,
395    ) -> Result<String> {
396        let response_json = serde_json::to_string(&response)?;
397        self.send_encrypted_private_message(request_pubkey, response_json, message_type)
398            .await
399    }
400
401    // Convenience helper to send error response with individual fields
402    pub async fn send_error_response(
403        &self,
404        request_pubkey: &str,
405        error_type: &str,
406        message: &str,
407        details: Option<&str>,
408        message_type: &str,
409    ) -> Result<String> {
410        let error = ErrorResponseContent {
411            error_type: error_type.to_string(),
412            message: message.to_string(),
413            details: details.map(|s| s.to_string()),
414        };
415        self.send_error_response_private_message(request_pubkey, error, message_type)
416            .await
417    }
418
419    // Send error response via private encrypted message
420    pub async fn send_error_response_private_message(
421        &self,
422        request_pubkey: &str,
423        error: ErrorResponseContent,
424        message_type: &str,
425    ) -> Result<String> {
426        let error_json = serde_json::to_string(&error)?;
427        self.send_encrypted_private_message(request_pubkey, error_json, message_type)
428            .await
429    }
430
431    // Send top-up response via private encrypted message
432    pub async fn send_topup_response_private_message(
433        &self,
434        request_pubkey: &str,
435        response: TopUpResponseContent,
436        message_type: &str,
437    ) -> Result<String> {
438        let response_json = serde_json::to_string(&response)?;
439        self.send_encrypted_private_message(request_pubkey, response_json, message_type)
440            .await
441    }
442
443    // Get the underlying Nostr client
444    pub fn client(&self) -> &Client {
445        &self.client
446    }
447
448    // NEW: Get service public key for users
449    pub fn get_service_public_key(&self) -> String {
450        self.keys.public_key().to_hex()
451    }
452
453    #[allow(dead_code)]
454    fn convert_event(&self, event: &nostr_sdk::Event) -> NostrEvent {
455        NostrEvent {
456            id: event.id.to_hex(),
457            pubkey: event.pubkey.to_hex(),
458            created_at: event.created_at.as_u64(),
459            kind: event.kind.as_u16() as u32,
460            tags: event
461                .tags
462                .iter()
463                .map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect())
464                .collect(),
465            content: event.content.clone(),
466            sig: event.sig.to_string(),
467            message_type: "unknown".to_string(),
468        }
469    }
470
471    /// Wait for a private decrypted message from a specific sender
472    pub async fn wait_for_decrypted_message(
473        &self,
474        sender_pubkey: &str,
475        timeout_secs: u64,
476    ) -> Result<NostrEvent> {
477        let sender_pk = nostr_sdk::PublicKey::parse(sender_pubkey)?;
478        let receiver_pk = self.keys.public_key();
479
480        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
481        let tx = Arc::new(Mutex::new(Some(tx)));
482        let client = self.client.clone();
483        let receiver_keys = self.keys.clone();
484        let timeout = tokio::time::Duration::from_secs(timeout_secs);
485
486        // Subscribe to messages sent TO us. Constrain to events
487        // published AFTER subscription start: relays deliver
488        // historical events matching a subscription filter alongside
489        // new ones, so without `since` we'd match old DMs from prior
490        // sessions (e.g. an `AccessDetailsContent` for a different
491        // workload_id from yesterday's test) before the actual
492        // response to the spawn we just sent. The 60s back-off
493        // accommodates clock skew between consumer and relays —
494        // relays MAY reject `since` values too far in the future,
495        // and a small lookback covers the case where the provider
496        // has already responded by the time we subscribe.
497        let subscribe_since = std::time::SystemTime::now()
498            .duration_since(std::time::UNIX_EPOCH)
499            .map(|d| d.as_secs())
500            .unwrap_or(0)
501            .saturating_sub(60);
502        let filter = Filter::new()
503            .pubkeys(vec![receiver_pk])
504            .kinds(vec![Kind::EncryptedDirectMessage, Kind::GiftWrap])
505            .since(nostr_sdk::Timestamp::from_secs(subscribe_since));
506
507        let _ = client.subscribe(filter, None).await;
508
509        // Use tokio::select to handle timeout and notification processing
510        let result = tokio::select! {
511            notification_res = client.handle_notifications(|notification| {
512                let tx = tx.clone();
513                let receiver_keys = receiver_keys.clone();
514                let sender_pk = sender_pk.clone();
515                let client = client.clone();
516
517                async move {
518                    if let RelayPoolNotification::Event { event, .. } = notification {
519                        let mut event_to_send = None;
520
521                        match event.kind {
522                            Kind::GiftWrap => {
523                                // GiftWrap might be NIP-17
524                                if let Ok(UnwrappedGift { rumor, sender }) = client.unwrap_gift_wrap(&event).await {
525                                    if sender == sender_pk && rumor.kind == Kind::PrivateDirectMessage {
526                                        event_to_send = Some(NostrEvent {
527                                            id: rumor.id.map(|id| id.to_hex()).unwrap_or_default(),
528                                            pubkey: sender.to_hex(),
529                                            created_at: rumor.created_at.as_u64(),
530                                            kind: rumor.kind.as_u16() as u32,
531                                            tags: rumor.tags.iter().map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect()).collect(),
532                                            content: rumor.content,
533                                            sig: String::new(),
534                                            message_type: "nip17".to_string(),
535                                        });
536                                    }
537                                }
538                            }
539                            Kind::EncryptedDirectMessage => {
540                                if event.pubkey == sender_pk {
541                                    let secret_key = receiver_keys.secret_key();
542                                    if let Ok(content) = nip04::decrypt(secret_key, &event.pubkey, &event.content) {
543                                        event_to_send = Some(NostrEvent {
544                                            id: event.id.to_hex(),
545                                            pubkey: event.pubkey.to_hex(),
546                                            created_at: event.created_at.as_u64(),
547                                            kind: event.kind.as_u16() as u32,
548                                            tags: event.tags.iter().map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect()).collect(),
549                                            content,
550                                            sig: event.sig.to_string(),
551                                            message_type: "nip04".to_string(),
552                                        });
553                                    }
554                                }
555                            }
556                            _ => {}
557                        }
558
559                        if let Some(ev) = event_to_send {
560                            let mut lock = tx.lock().await;
561                            if let Some(sender) = lock.take() {
562                                let _ = sender.send(ev).await;
563                                return Ok(true); // Stop handling notifications
564                            }
565                        }
566                    }
567                    Ok(false)
568                }
569            }) => {
570                match notification_res {
571                    Ok(_) => rx.recv().await.ok_or_else(|| anyhow::anyhow!("Channel closed")),
572                    Err(e) => Err(anyhow::anyhow!("Notification handler error: {}", e)),
573                }
574            }
575            _ = tokio::time::sleep(timeout) => {
576                Err(anyhow::anyhow!("Timeout waiting for response from {}", sender_pubkey))
577            }
578        };
579
580        result
581    }
582}
583
584pub fn default_relay_config() -> RelayConfig {
585    RelayConfig {
586        relays: vec![
587            "wss://relay.damus.io".to_string(),
588            "wss://nos.lol".to_string(),
589            "wss://relay.nostr.band".to_string(),
590        ],
591        private_key: None,
592    }
593}
594
595pub fn custom_relay_config(relays: Vec<String>, private_key: Option<String>) -> RelayConfig {
596    RelayConfig {
597        relays,
598        private_key,
599    }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct PodSpec {
604    pub id: String, // Unique identifier for this spec (e.g., "basic", "standard", "premium")
605    pub name: String, // Human-readable name (e.g., "Basic", "Standard", "Premium")
606    pub description: String, // Description of the spec
607    pub cpu_millicores: u64, // CPU in millicores (1000 millicores = 1 CPU core)
608    pub memory_mb: u64, // Memory in MB
609    pub rate_msats_per_sec: u64, // Payment rate for this spec
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize)]
613pub struct OfferEventContent {
614    pub minimum_duration_seconds: u64,
615    pub whitelisted_mints: Vec<String>,
616    pub pod_specs: Vec<PodSpec>, // Multiple pod specifications offered
617}
618
619/// One workload-port that a template-spawned container exposes to the
620/// consumer. Distinct from `AccessDetailsContent.node_port` (the SSH
621/// forwarding port). Empty for non-template spawns.
622#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
623pub struct TemplateAccessPort {
624    /// Host port the consumer connects to.
625    pub host_port: u16,
626    /// Container-internal port (informational; for the consumer to
627    /// understand what's running).
628    pub container_port: u16,
629    /// Wire protocol (`tcp`, `http`, `ws`, `bitcoin-rpc`, ...).
630    pub protocol: String,
631    /// Human-readable label from the template definition
632    /// (e.g. `relay-ws`, `ollama-http`, `rpc`). Lets clients route
633    /// traffic by role rather than guessing port-by-port.
634    pub label: String,
635}
636
637#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct AccessDetailsContent {
639    pub pod_npub: String,             // Pod's NPUB identifier
640    pub node_port: u16,               // SSH port for direct access
641    pub expires_at: String,           // Pod expiration time
642    pub cpu_millicores: u64,          // CPU allocation in millicores
643    pub memory_mb: u64,               // Memory allocation in MB
644    pub pod_spec_name: String,        // Human-readable spec name
645    pub pod_spec_description: String, // Spec description
646    pub instructions: Vec<String>,    // SSH connection instructions
647
648    /// Host address the consumer connects to. Same string that
649    /// appears in the SSH instruction; promoted to a structured
650    /// field so programmatic clients don't have to scrape the
651    /// instruction strings.
652    #[serde(default, skip_serializing_if = "String::is_empty")]
653    pub host_address: String,
654
655    /// Workload-specific ports published by a template spawn.
656    /// Empty for non-template (legacy) spawns. Old clients without
657    /// this field continue to deserialize cleanly.
658    #[serde(default, skip_serializing_if = "Vec::is_empty")]
659    pub template_ports: Vec<TemplateAccessPort>,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ErrorResponseContent {
664    pub error_type: String, // Type of error (e.g., "insufficient_payment", "invalid_spec", "image_not_found")
665    pub message: String,    // Human-readable error message
666    pub details: Option<String>, // Additional error details
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
670pub struct TopUpResponseContent {
671    pub success: bool,
672    pub pod_npub: String,
673    pub extended_duration_seconds: u64,
674    pub new_expires_at: String,
675    pub message: String,
676}
677
678// NEW: Encrypted request structure
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct EncryptedSpawnPodRequest {
681    pub cashu_token: String,
682    pub pod_spec_id: Option<String>, // Optional: Which pod spec to use (defaults to first available)
683    pub pod_image: String,           // Required: Container image to use for the pod
684    pub ssh_username: String,
685    pub ssh_password: String,
686
687    /// Optional template slug. When set, the provider materializes
688    /// the workload's image / ports / env from its OWN local
689    /// template registry (`paygress::templates`) rather than
690    /// trusting consumer-supplied bytes — so a consumer cannot
691    /// smuggle an arbitrary image past the provider's vetted
692    /// list. `pod_image` is ignored when `template_slug` resolves.
693    /// Old clients that don't set this field continue to work
694    /// (`#[serde(default)]`).
695    #[serde(default, skip_serializing_if = "Option::is_none")]
696    pub template_slug: Option<String>,
697
698    /// Replication mode requested by the consumer (Unit 5 wiring
699    /// completion). Old clients that don't set this field default to
700    /// `ReplicationMode::None` — same shape as before, no behavior
701    /// change for unspecified spawns.
702    ///
703    /// `WarmStandby { standby_providers }` is the load-bearing
704    /// variant: the consumer sends the SAME spawn request to every
705    /// provider in the standby set; each provider determines its own
706    /// role (primary if it is not in the standby list, standby
707    /// otherwise) and the orchestrator coordinates failover via the
708    /// `LeaseRevocation` event published by #34's wiring.
709    #[serde(default, skip_serializing_if = "Option::is_none")]
710    pub replication: Option<crate::durable_workload::ReplicationMode>,
711
712    /// Primary provider's npub. Required when `replication` is
713    /// `WarmStandby`; ignored otherwise. Lets each receiving
714    /// provider self-determine its role: if `self.npub == primary_npub`
715    /// it acts as the primary (spawns + heartbeats); otherwise (and
716    /// only if it is in `standby_providers`) it acts as a standby
717    /// (reserves a slot, listens for revocations, promotes on signal).
718    #[serde(default, skip_serializing_if = "Option::is_none")]
719    pub primary_npub: Option<String>,
720
721    /// Consumer-assigned workload identifier (UUID-shaped string).
722    /// Required when `replication` is `WarmStandby` so the primary
723    /// and N standbys share one stable id across providers — the
724    /// `LeaseRevocation` event uses this id, and the standby looks
725    /// up its reserved slot by it on receipt. Single-provider spawns
726    /// can leave this unset; the provider derives a vmid-based id
727    /// internally.
728    #[serde(default, skip_serializing_if = "Option::is_none")]
729    pub workload_id: Option<String>,
730
731    /// Optional encryption key for the workload's persistent data
732    /// volume. When set, the provider creates a LUKS-encrypted
733    /// volume (instead of a plain one) for `template.data_path` and
734    /// destroys the volume header on tenancy end so post-eviction
735    /// disk forensics reveal only ciphertext.
736    ///
737    /// Threat model: protects against post-eviction snooping, lazy
738    /// host-operator backups, co-tenant attacks on shared storage,
739    /// and cold-disk forensics if the host is seized. Does NOT
740    /// protect against a live host with `CAP_SYS_PTRACE` reading
741    /// /proc/<pid>/mem or extracting the LUKS key from the kernel
742    /// keyring while the workload runs — that requires hardware
743    /// confidential VMs (SEV-SNP / TDX), which the
744    /// `attested-research-tier` `IsolationLevel` is reserved for.
745    ///
746    /// The key travels inside this Nostr DM, which is itself
747    /// NIP-04 / NIP-17 encrypted to the provider's pubkey, so it is
748    /// never visible on relays or in transit. The provider holds it
749    /// only in memory while the workload runs.
750    ///
751    /// Old clients that don't set this field get plain volumes —
752    /// same shape as before, no behavior change for unspecified
753    /// spawns.
754    #[serde(default, skip_serializing_if = "Option::is_none")]
755    pub volume_encryption: Option<VolumeEncryption>,
756}
757
758/// Wire-format request to encrypt the workload's data volume.
759///
760/// The `key_b64` is a 32-byte symmetric key, base64-encoded
761/// (URL-safe, no padding). Provider feeds it to `cryptsetup
762/// luksFormat` as a passphrase (raw bytes, no hashing on top).
763///
764/// `algorithm` is a forward-compat tag so a future schema bump can
765/// introduce e.g. `xchacha20-poly1305` or hardware-attested keying
766/// without breaking existing requests. v1 supports `luks2-aes-xts`
767/// only; providers reject unknown algorithms with a structured
768/// `UnsupportedVolumeEncryption` error so old providers seeing a
769/// future-algorithm request fail loud rather than silently fall
770/// back to plain volumes.
771#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
772pub struct VolumeEncryption {
773    /// Schema version. v1 = LUKS2 + AES-XTS-Plain64, key supplied
774    /// directly. Bump for new key-derivation flows (e.g. attested
775    /// key release from a TPM / TEE).
776    #[serde(default = "volume_encryption_default_version")]
777    pub version: u8,
778
779    /// Algorithm tag. v1 only accepts `luks2-aes-xts`.
780    pub algorithm: String,
781
782    /// 32-byte key, base64 (URL-safe, unpadded). Consumer derives
783    /// from a stable secret + workload_id so the same key recurs
784    /// on respawn / standby promotion (the standby decrypts the
785    /// checkpoint with it).
786    pub key_b64: String,
787}
788
789fn volume_encryption_default_version() -> u8 {
790    1
791}
792
793impl VolumeEncryption {
794    /// Algorithm tag for the v1 wire format. Spelled out so callers
795    /// don't need to know the LUKS internals.
796    pub const ALGORITHM_V1: &'static str = "luks2-aes-xts";
797    pub const VERSION_V1: u8 = 1;
798
799    /// Build a v1 VolumeEncryption from a raw 32-byte key.
800    pub fn v1(key: [u8; 32]) -> Self {
801        use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
802        Self {
803            version: Self::VERSION_V1,
804            algorithm: Self::ALGORITHM_V1.to_string(),
805            key_b64: URL_SAFE_NO_PAD.encode(key),
806        }
807    }
808
809    /// Decode the base64 key back to raw bytes. Errors if the
810    /// payload is malformed or the wrong length for the declared
811    /// algorithm.
812    pub fn decoded_key(&self) -> Result<[u8; 32], anyhow::Error> {
813        use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
814        let bytes = URL_SAFE_NO_PAD
815            .decode(self.key_b64.as_bytes())
816            .map_err(|e| anyhow::anyhow!("volume_encryption.key_b64 invalid base64: {}", e))?;
817        if bytes.len() != 32 {
818            anyhow::bail!(
819                "volume_encryption.key_b64 decoded to {} bytes, expected 32",
820                bytes.len()
821            );
822        }
823        let mut out = [0u8; 32];
824        out.copy_from_slice(&bytes);
825        Ok(out)
826    }
827}
828
829/// Pure helper for self-role detection on a `WarmStandby` spawn
830/// request. Returns the role this provider should take. Surfaced so
831/// the role-routing logic in `provider::handle_spawn_request` is
832/// unit-testable without spinning up a state machine.
833///
834/// Convention:
835///   - if `self_npub == primary_npub` → Primary
836///   - else if `self_npub` is in `standby_providers` → Standby (with index)
837///   - else → NotAddressed (provider should reject the request)
838#[derive(Debug, Clone, PartialEq, Eq)]
839pub enum WarmStandbyRole {
840    Primary,
841    Standby { index: usize, count: usize },
842    NotAddressed,
843}
844
845pub fn warm_standby_role(
846    self_npub: &str,
847    primary_npub: &str,
848    standby_providers: &[String],
849) -> WarmStandbyRole {
850    // Normalize both sides via PublicKey::parse, which accepts both
851    // bech32 (`npub1...`) and 64-hex inputs and yields a canonical
852    // bytes representation. The provider service stores its own npub
853    // as hex (`get_service_public_key` calls `.to_hex()`); the
854    // consumer-facing CLI ships bech32 in `EncryptedSpawnPodRequest.
855    // primary_npub` and `standby_providers`. Comparing the strings
856    // directly always returned `NotAddressed`, breaking warm-standby
857    // for any consumer using the bech32 form (which is everyone).
858    //
859    // Falling back to direct string comparison when parsing fails
860    // (e.g. an empty or malformed primary_npub) preserves existing
861    // semantics for that error path — the `NotAddressed` outcome is
862    // still correct.
863    if npubs_equal(self_npub, primary_npub) {
864        return WarmStandbyRole::Primary;
865    }
866    for (idx, p) in standby_providers.iter().enumerate() {
867        if npubs_equal(self_npub, p) {
868            return WarmStandbyRole::Standby {
869                index: idx,
870                count: standby_providers.len(),
871            };
872        }
873    }
874    WarmStandbyRole::NotAddressed
875}
876
877/// True iff two npub strings refer to the same Nostr public key.
878/// Accepts bech32 (`npub1...`) and 64-hex on either side and treats
879/// them as equal when they canonicalize to the same key. Falls back
880/// to direct string equality when both sides fail to parse — that
881/// keeps unit tests that use placeholder strings like
882/// `"npub1primary"` working, and cannot create false positives in
883/// production (where every input is a real key in one of the two
884/// canonical forms).
885pub fn npubs_equal(a: &str, b: &str) -> bool {
886    let pa = nostr_sdk::PublicKey::parse(a);
887    let pb = nostr_sdk::PublicKey::parse(b);
888    match (pa, pb) {
889        (Ok(ka), Ok(kb)) => ka == kb,
890        // One side parses, the other doesn't: not the same key.
891        // Without this case, comparing a real bech32 npub against a
892        // typoed/placeholder string would fall through to the
893        // direct-string branch and silently treat them as unequal —
894        // which is the right outcome, but the explicit branch makes
895        // the intent obvious.
896        (Ok(_), Err(_)) | (Err(_), Ok(_)) => false,
897        // Neither parses: fall back to direct equality. Used by
898        // unit tests with placeholder npubs; in production both
899        // sides will always parse.
900        (Err(_), Err(_)) => a == b,
901    }
902}
903
904// NEW: Encrypted top-up request structure
905#[derive(Debug, Clone, Serialize, Deserialize)]
906pub struct EncryptedTopUpPodRequest {
907    pub pod_npub: String, // Pod's NPUB identifier
908    pub cashu_token: String,
909}
910
911// NEW: Helper function to send private message provisioning request
912pub async fn send_provisioning_request_private_message(
913    client: &Client,
914    service_pubkey: &str,
915    request: EncryptedSpawnPodRequest,
916) -> Result<String> {
917    let request_json = serde_json::to_string(&request)?;
918
919    // Send as private message
920    let service_pubkey_parsed = nostr_sdk::PublicKey::parse(service_pubkey)?;
921    let event_id = client
922        .send_private_msg(service_pubkey_parsed, request_json, [])
923        .await?;
924
925    Ok(event_id.val.to_hex())
926}
927
928// NEW: Helper function to parse private message content
929/// Unified request type for private messages
930#[derive(Debug, Clone, Serialize, Deserialize)]
931#[serde(untagged)]
932pub enum PrivateRequest {
933    Spawn(EncryptedSpawnPodRequest),
934    TopUp(EncryptedTopUpPodRequest),
935    Status(StatusRequestContent),
936}
937
938pub fn parse_private_message_content(content: &str) -> Result<PrivateRequest> {
939    match serde_json::from_str::<PrivateRequest>(content) {
940        Ok(request) => Ok(request),
941        Err(e) => {
942            // Provide detailed error information, but truncate content to avoid huge log strings
943            let truncated_content = if content.len() > 100 {
944                format!("{}...", &content[..100])
945            } else {
946                content.to_string()
947            };
948            Err(anyhow::anyhow!(
949                "JSON parsing failed: {}. Content: '{}'",
950                e,
951                truncated_content
952            ))
953        }
954    }
955}
956
957/// Parse a `NostrEvent` as a `LeaseRevocationContent` if its `kind`
958/// matches `KIND_LEASE_REVOCATION` and the body deserializes
959/// cleanly. Returns `None` for any non-revocation event so the
960/// caller can fall through to other dispatch arms without re-parsing.
961///
962/// Pure function — exposed so the standby-side dispatcher can be
963/// unit-tested without spinning up the relay pool.
964pub fn parse_revocation_event(event: &NostrEvent) -> Option<LeaseRevocationContent> {
965    if event.kind != KIND_LEASE_REVOCATION as u32 {
966        return None;
967    }
968    serde_json::from_str::<LeaseRevocationContent>(&event.content).ok()
969}
970
971// ==================== Provider Discovery Structures ====================
972
973/// Capacity information for a provider
974#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct CapacityInfo {
976    pub cpu_available: u64,        // Available CPU in millicores
977    pub memory_mb_available: u64,  // Available memory in MB
978    pub storage_gb_available: u64, // Available storage in GB
979}
980
981/// Provider isolation level (Unit 4 surfaces this on offers from
982/// Q1; Unit 22 will populate it with the real research-tier
983/// implementation). `#[serde(default)]` so v0 offers parse cleanly.
984#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
985#[serde(rename_all = "kebab-case")]
986pub enum IsolationLevel {
987    /// Default LXC / shared-kernel container.
988    #[default]
989    SharedKernel,
990    /// Whole-host dedicated to a single workload (no co-tenants).
991    DedicatedHost,
992    /// Attested AMD SEV-SNP / Intel TDX research tier (year-2 R9).
993    AttestedResearchTier,
994}
995
996impl IsolationLevel {
997    /// Numeric strength ordering used for "minimum acceptable tier"
998    /// comparisons. Higher = more isolated. NOT a Deserialize hint
999    /// (the wire format is the kebab-case slug); just a helper for
1000    /// the consumer's `--isolation-level` filter to decide whether
1001    /// a provider's offer meets the consumer's threshold.
1002    ///
1003    /// SharedKernel = 0, DedicatedHost = 1, AttestedResearchTier = 2.
1004    /// `meets(min)` returns true iff `self >= min` in this ordering.
1005    pub fn rank(self) -> u8 {
1006        match self {
1007            Self::SharedKernel => 0,
1008            Self::DedicatedHost => 1,
1009            Self::AttestedResearchTier => 2,
1010        }
1011    }
1012
1013    /// True iff this offer's isolation tier is at least as strong
1014    /// as `min`. Used by the consumer's offer filter.
1015    pub fn meets(self, min: IsolationLevel) -> bool {
1016        self.rank() >= min.rank()
1017    }
1018
1019    /// Parse the kebab-case slug used on the CLI and the wire.
1020    pub fn from_slug(s: &str) -> Option<Self> {
1021        match s {
1022            "shared-kernel" => Some(Self::SharedKernel),
1023            "dedicated-host" => Some(Self::DedicatedHost),
1024            "attested-research-tier" => Some(Self::AttestedResearchTier),
1025            _ => None,
1026        }
1027    }
1028
1029    /// Inverse of `from_slug` — for displaying a provider's tier
1030    /// to the consumer.
1031    pub fn slug(self) -> &'static str {
1032        match self {
1033            Self::SharedKernel => "shared-kernel",
1034            Self::DedicatedHost => "dedicated-host",
1035            Self::AttestedResearchTier => "attested-research-tier",
1036        }
1037    }
1038}
1039
1040fn default_schema_version() -> u8 {
1041    SCHEMA_VERSION
1042}
1043
1044/// Provider offer content published to Nostr (Kind 38383).
1045///
1046/// Parameterized-replaceable event addressed by
1047/// `(pubkey, 38383, d="paygress:offer:v1:<npub>")`.
1048#[derive(Debug, Clone, Serialize, Deserialize)]
1049pub struct ProviderOfferContent {
1050    pub provider_npub: String,
1051    pub hostname: String,
1052    pub location: Option<String>,
1053    pub capabilities: Vec<String>, // ["lxc", "vm"]
1054    pub specs: Vec<PodSpec>,
1055    pub whitelisted_mints: Vec<String>,
1056    pub uptime_percent: f32,
1057    pub total_jobs_completed: u64,
1058    pub api_endpoint: Option<String>,
1059
1060    /// Schema version. v0 offers (no field on the wire) deserialize
1061    /// to `1` via the default. Bump on any breaking change.
1062    #[serde(default = "default_schema_version")]
1063    pub version: u8,
1064
1065    /// Isolation level the provider promises (Unit 4 / Unit 22).
1066    #[serde(default)]
1067    pub isolation_level: IsolationLevel,
1068
1069    /// Optional fidelity-bond stake. Offers carrying a verifiable
1070    /// stake proof are eligible for the `staked` discovery tier.
1071    #[serde(default, skip_serializing_if = "Option::is_none")]
1072    pub stake_proof: Option<crate::stake::StakeProof>,
1073}
1074
1075/// Heartbeat content published to Nostr.
1076///
1077/// Dual-published on two kinds (see Unit 4):
1078/// - `KIND_PROVIDER_HEARTBEAT` (38384, addressable, with bucketed
1079///   `d` tag) for stored uptime history.
1080/// - `KIND_PROVIDER_HEARTBEAT_EPHEMERAL` (20384) for live presence.
1081#[derive(Debug, Clone, Serialize, Deserialize)]
1082pub struct HeartbeatContent {
1083    pub provider_npub: String,
1084    pub timestamp: u64,
1085    pub active_workloads: u32,
1086    pub available_capacity: CapacityInfo,
1087
1088    /// Schema version. See `ProviderOfferContent::version`.
1089    #[serde(default = "default_schema_version")]
1090    pub version: u8,
1091}
1092
1093/// Lease revocation content (Unit 5 wiring).
1094///
1095/// Emitted by a primary provider whose workload state machine has
1096/// transitioned the workload out of `Live` (typically because the
1097/// primary observed its own heartbeats failing to reach quorum at
1098/// relays — split-brain self-eviction). Standby providers listed in
1099/// `standby_providers` can promote on observing this event without
1100/// fear of two writers, because the primary has *already* left Live
1101/// before publishing.
1102#[derive(Debug, Clone, Serialize, Deserialize)]
1103pub struct LeaseRevocationContent {
1104    /// Consumer-assigned workload identifier (the same UUID-shaped
1105    /// string the consumer sent in the spawn request as
1106    /// `workload_id`). Standbys key their slot table by this id and
1107    /// use it to look up the matching reservation when a revocation
1108    /// arrives. v0 events used a u32 (the primary's local vmid) —
1109    /// the change to String is a wire-format bump, but no v0
1110    /// revocations were ever published in production (#34/#41
1111    /// shipped the listener, not the publisher's own consumers).
1112    pub workload_id: String,
1113    pub primary_provider_npub: String,
1114    pub standby_providers: Vec<String>,
1115    pub reason: String,
1116    pub revoked_at: u64,
1117
1118    /// Optional Blossom URI of the latest checkpoint (Unit 6). When
1119    /// set, the standby restores from this state rather than spawning
1120    /// a fresh container.
1121    #[serde(default, skip_serializing_if = "Option::is_none")]
1122    pub state_uri: Option<String>,
1123
1124    /// Schema version. v0 (no field on the wire) deserializes to 1.
1125    #[serde(default = "default_schema_version")]
1126    pub version: u8,
1127}
1128
1129/// Published by a standby provider IMMEDIATELY after it finishes
1130/// promoting a workload to local primary. Distinguishable from a
1131/// regular periodic heartbeat because the heartbeat says "this
1132/// provider is online" whereas this event says "this provider has
1133/// just claimed this workload's primary role". Higher-indexed
1134/// peers query for these events at promotion-time and drop their
1135/// slot if they see one from a peer for the same workload_id.
1136#[derive(Debug, Clone, Serialize, Deserialize)]
1137pub struct StandbyPromotionAnnouncementContent {
1138    /// Consumer-assigned workload identifier (the same UUID-shaped
1139    /// string used by `LeaseRevocationContent.workload_id` and the
1140    /// spawn request's `workload_id`).
1141    pub workload_id: String,
1142    /// The npub of the standby provider that has just promoted to
1143    /// primary. Peers compare this against their own npub to
1144    /// confirm the claim is from a peer (not themselves).
1145    pub new_primary_npub: String,
1146    /// Unix-second timestamp at which the promotion completed.
1147    pub promoted_at: u64,
1148    #[serde(default = "default_schema_version")]
1149    pub version: u8,
1150}
1151
1152/// Provider info as seen by discovery clients
1153#[derive(Debug, Clone, Serialize, Deserialize)]
1154pub struct ProviderInfo {
1155    pub npub: String,
1156    pub hostname: String,
1157    pub location: Option<String>,
1158    pub capabilities: Vec<String>,
1159    pub specs: Vec<PodSpec>,
1160    pub whitelisted_mints: Vec<String>,
1161    pub uptime_percent: f32,
1162    pub total_jobs_completed: u64,
1163    pub last_seen: u64, // Timestamp of last heartbeat
1164    pub is_online: bool,
1165    /// Isolation tier the provider promises (mirrored from
1166    /// `ProviderOfferContent.isolation_level`). Consumers filtering
1167    /// by `--isolation-level` match on this. Defaults to
1168    /// `SharedKernel` for v0 offers (no field on the wire).
1169    pub isolation_level: IsolationLevel,
1170}
1171
1172/// Filter for querying providers
1173#[derive(Debug, Clone, Default)]
1174pub struct ProviderFilter {
1175    pub capability: Option<String>,
1176    pub min_uptime: Option<f32>,
1177    pub min_memory_mb: Option<u64>,
1178    pub min_cpu: Option<u64>,
1179    /// Minimum acceptable isolation tier. `Some(DedicatedHost)`
1180    /// matches `DedicatedHost` and `AttestedResearchTier` providers
1181    /// (anything stricter is also acceptable). `None` = no filter.
1182    pub isolation_level: Option<IsolationLevel>,
1183}
1184
1185impl NostrRelaySubscriber {
1186    /// Publish a provider offer event (Kind 38383 — parameterized
1187    /// replaceable). The `d` tag is versioned
1188    /// (`paygress:offer:v1:<npub>`) so future schema bumps coexist
1189    /// with v1 events on the same relay.
1190    pub async fn publish_provider_offer(&self, offer: ProviderOfferContent) -> Result<String> {
1191        let content = serde_json::to_string(&offer)?;
1192        info!("Publishing provider offer for {}", offer.hostname);
1193
1194        let d_tag = format!("paygress:offer:v{}:{}", offer.version, offer.provider_npub);
1195        let tags = vec![
1196            Tag::hashtag("paygress"),
1197            Tag::hashtag("compute"),
1198            Tag::parse(["d", d_tag.as_str()])?,
1199            Tag::parse(["v", offer.version.to_string().as_str()])?,
1200        ];
1201
1202        let event = EventBuilder::new(Kind::Custom(KIND_PROVIDER_OFFER), content)
1203            .tags(tags)
1204            .sign_with_keys(&self.keys)?;
1205        let event_id = event.id.to_hex();
1206
1207        match self.client.send_event(&event).await {
1208            Ok(res) => {
1209                info!("✅ Published provider offer: {} ({:?})", event_id, res);
1210                Ok(event_id)
1211            }
1212            Err(e) => {
1213                error!("❌ Failed to publish provider offer: {}", e);
1214                Err(e.into())
1215            }
1216        }
1217    }
1218
1219    /// Publish a heartbeat event on BOTH the addressable (stored,
1220    /// kind 38384) and ephemeral (kind 20384) kinds. Returns the
1221    /// addressable event id and the set of relay URLs that
1222    /// successfully accepted the *stored* heartbeat — the orchestrator
1223    /// loop (Unit 5 wiring) consumes these as `HeartbeatObservation`s
1224    /// to drive the workload state machine.
1225    ///
1226    /// The addressable form uses a per-minute bucketed `d` tag
1227    /// (`paygress:heartbeat:v1:<npub>:<bucket>`) so each minute's
1228    /// heartbeat lands in its own `(pubkey, kind, d-tag)` slot.
1229    /// Without bucketing, every heartbeat replaces the previous one
1230    /// at the relay and `calculate_uptime` sees zero history.
1231    pub async fn publish_heartbeat(
1232        &self,
1233        heartbeat: HeartbeatContent,
1234    ) -> Result<(String, Vec<String>)> {
1235        let content = serde_json::to_string(&heartbeat)?;
1236        let bucket = heartbeat.timestamp / HEARTBEAT_BUCKET_SECS;
1237        let d_tag = format!(
1238            "paygress:heartbeat:v{}:{}:{}",
1239            heartbeat.version, heartbeat.provider_npub, bucket
1240        );
1241
1242        let provider_pk = nostr_sdk::PublicKey::parse(&heartbeat.provider_npub)?;
1243        let v_tag = heartbeat.version.to_string();
1244
1245        // 1. Stored, addressable: relays keep this for history-based
1246        //    queries (calculate_uptime).
1247        let stored_tags = vec![
1248            Tag::hashtag("paygress-heartbeat"),
1249            Tag::public_key(provider_pk),
1250            Tag::parse(["d", d_tag.as_str()])?,
1251            Tag::parse(["v", v_tag.as_str()])?,
1252        ];
1253        let stored_event =
1254            EventBuilder::new(Kind::Custom(KIND_PROVIDER_HEARTBEAT), content.clone())
1255                .tags(stored_tags)
1256                .sign_with_keys(&self.keys)?;
1257        let stored_id = stored_event.id.to_hex();
1258
1259        // 2. Ephemeral: relays don't store, but live subscribers see
1260        //    it immediately. Cheap and good for dashboards.
1261        let ephemeral_tags = vec![
1262            Tag::hashtag("paygress-heartbeat"),
1263            Tag::public_key(provider_pk),
1264            Tag::parse(["v", v_tag.as_str()])?,
1265        ];
1266        let ephemeral_event =
1267            EventBuilder::new(Kind::Custom(KIND_PROVIDER_HEARTBEAT_EPHEMERAL), content)
1268                .tags(ephemeral_tags)
1269                .sign_with_keys(&self.keys)?;
1270
1271        let mut accepting_relays: Vec<String> = Vec::new();
1272        match self.client.send_event(&stored_event).await {
1273            Ok(out) => {
1274                debug!("📦 Stored heartbeat published: {}", stored_id);
1275                accepting_relays = out.success.iter().map(|u| u.to_string()).collect();
1276            }
1277            Err(e) => warn!("Failed to publish stored heartbeat: {}", e),
1278        }
1279        match self.client.send_event(&ephemeral_event).await {
1280            Ok(_) => debug!("⚡ Ephemeral heartbeat published"),
1281            Err(e) => warn!("Failed to publish ephemeral heartbeat: {}", e),
1282        }
1283
1284        info!(
1285            "💓 Heartbeat published (stored + ephemeral): {} accepted by {} relay(s)",
1286            stored_id,
1287            accepting_relays.len()
1288        );
1289        Ok((stored_id, accepting_relays))
1290    }
1291
1292    /// Publish a `LeaseRevocationContent` event (Unit 5 wiring).
1293    ///
1294    /// Addressable kind 38385, keyed by
1295    /// `(pubkey, kind, d="paygress:revocation:v1:<primary_npub>:<workload_id>")`
1296    /// so a standby coming online after the publish still observes
1297    /// the latest revocation for that workload. Each standby is
1298    /// added as a `#p` tag so subscribers filtering by their own
1299    /// pubkey see only revocations addressed to them.
1300    pub async fn publish_lease_revocation(
1301        &self,
1302        revocation: LeaseRevocationContent,
1303    ) -> Result<String> {
1304        let content = serde_json::to_string(&revocation)?;
1305        let d_tag = format!(
1306            "paygress:revocation:v{}:{}:{}",
1307            revocation.version, revocation.primary_provider_npub, revocation.workload_id
1308        );
1309        let v_tag = revocation.version.to_string();
1310        let workload_id_str = revocation.workload_id.to_string();
1311
1312        let mut tags = vec![
1313            Tag::hashtag("paygress"),
1314            Tag::hashtag("paygress-revocation"),
1315            Tag::parse(["d", d_tag.as_str()])?,
1316            Tag::parse(["v", v_tag.as_str()])?,
1317            Tag::parse(["workload", workload_id_str.as_str()])?,
1318        ];
1319        for standby_npub in &revocation.standby_providers {
1320            if let Ok(pk) = nostr_sdk::PublicKey::parse(standby_npub) {
1321                tags.push(Tag::public_key(pk));
1322            } else {
1323                warn!(
1324                    "Skipping unparseable standby npub in revocation: {}",
1325                    standby_npub
1326                );
1327            }
1328        }
1329
1330        let event = EventBuilder::new(Kind::Custom(KIND_LEASE_REVOCATION), content)
1331            .tags(tags)
1332            .sign_with_keys(&self.keys)?;
1333        let event_id = event.id.to_hex();
1334
1335        match self.client.send_event(&event).await {
1336            Ok(out) => {
1337                info!(
1338                    "📜 Lease revocation published for workload {}: {} accepted by {} relay(s)",
1339                    revocation.workload_id,
1340                    event_id,
1341                    out.success.len()
1342                );
1343                Ok(event_id)
1344            }
1345            Err(e) => {
1346                error!("Failed to publish lease revocation: {}", e);
1347                Err(e.into())
1348            }
1349        }
1350    }
1351
1352    /// Publish a `StandbyPromotionAnnouncement` for a freshly-
1353    /// promoted workload. NIP-33 addressable: `d` tag is
1354    /// `paygress:promoted:v1:<workload_id>` so a re-failover for
1355    /// the same workload replaces the prior announcement.
1356    pub async fn publish_standby_promotion_announcement(
1357        &self,
1358        announcement: StandbyPromotionAnnouncementContent,
1359    ) -> Result<String> {
1360        let content = serde_json::to_string(&announcement)?;
1361        let d_tag = format!(
1362            "paygress:promoted:v{}:{}",
1363            announcement.version, announcement.workload_id
1364        );
1365        let v_tag = announcement.version.to_string();
1366        let tags = vec![
1367            Tag::hashtag("paygress"),
1368            Tag::hashtag("paygress-promoted"),
1369            Tag::parse(["d", d_tag.as_str()])?,
1370            Tag::parse(["v", v_tag.as_str()])?,
1371            Tag::parse(["workload", announcement.workload_id.as_str()])?,
1372        ];
1373
1374        let event = EventBuilder::new(Kind::Custom(KIND_STANDBY_PROMOTION_ANNOUNCEMENT), content)
1375            .tags(tags)
1376            .sign_with_keys(&self.keys)?;
1377        let event_id = event.id.to_hex();
1378
1379        match self.client.send_event(&event).await {
1380            Ok(out) => {
1381                info!(
1382                    "📢 Standby promotion announcement published for workload {}: {} accepted by {} relay(s)",
1383                    announcement.workload_id,
1384                    event_id,
1385                    out.success.len()
1386                );
1387                Ok(event_id)
1388            }
1389            Err(e) => {
1390                error!("Failed to publish standby promotion announcement: {}", e);
1391                Err(e.into())
1392            }
1393        }
1394    }
1395
1396    /// Query for any `StandbyPromotionAnnouncement` for `workload_id`
1397    /// authored by one of `peer_npubs`. Returns the first matching
1398    /// event's content (or None). The watchdog/promotion path uses
1399    /// this BEFORE spawning a container to detect that a peer has
1400    /// already promoted; if any event is found, the local standby
1401    /// drops its slot rather than producing a duplicate primary.
1402    pub async fn query_standby_promotion_announcements(
1403        &self,
1404        workload_id: &str,
1405        peer_npubs: &[String],
1406    ) -> Result<Option<StandbyPromotionAnnouncementContent>> {
1407        if peer_npubs.is_empty() {
1408            return Ok(None);
1409        }
1410        let mut authors = Vec::new();
1411        for npub in peer_npubs {
1412            if let Ok(pk) = nostr_sdk::PublicKey::parse(npub) {
1413                authors.push(pk);
1414            }
1415        }
1416        if authors.is_empty() {
1417            return Ok(None);
1418        }
1419
1420        let filter = Filter::new()
1421            .kind(Kind::Custom(KIND_STANDBY_PROMOTION_ANNOUNCEMENT))
1422            .authors(authors)
1423            .custom_tag(
1424                nostr_sdk::SingleLetterTag::lowercase(nostr_sdk::Alphabet::D),
1425                format!("paygress:promoted:v{}:{}", SCHEMA_VERSION, workload_id),
1426            );
1427
1428        let events = self
1429            .client
1430            .fetch_events(filter, std::time::Duration::from_secs(5))
1431            .await?;
1432
1433        for event in events.iter() {
1434            if let Ok(content) =
1435                serde_json::from_str::<StandbyPromotionAnnouncementContent>(&event.content)
1436            {
1437                if content.workload_id == workload_id {
1438                    return Ok(Some(content));
1439                }
1440            }
1441        }
1442        Ok(None)
1443    }
1444
1445    /// Query all provider offers from relays
1446    pub async fn query_providers(&self) -> Result<Vec<ProviderOfferContent>> {
1447        let filter = Filter::new()
1448            .kind(Kind::Custom(KIND_PROVIDER_OFFER))
1449            .hashtag("paygress");
1450
1451        let events = self
1452            .client
1453            .fetch_events(filter, std::time::Duration::from_secs(5))
1454            .await?;
1455
1456        let mut providers = Vec::new();
1457        for event in events {
1458            match serde_json::from_str::<ProviderOfferContent>(&event.content) {
1459                Ok(offer) => providers.push(offer),
1460                Err(e) => {
1461                    warn!("Failed to parse provider offer {}: {}", event.id, e);
1462                }
1463            }
1464        }
1465
1466        info!("Found {} providers", providers.len());
1467        Ok(providers)
1468    }
1469
1470    /// Query heartbeats for a specific provider since a given time
1471    pub async fn query_heartbeats(
1472        &self,
1473        provider_npub: &str,
1474        since_secs: u64,
1475    ) -> Result<Vec<HeartbeatContent>> {
1476        let provider_pubkey = nostr_sdk::PublicKey::parse(provider_npub)?;
1477
1478        let filter = Filter::new()
1479            .kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
1480            .author(provider_pubkey)
1481            .since(Timestamp::from(since_secs));
1482
1483        let events = self
1484            .client
1485            .fetch_events(filter, std::time::Duration::from_secs(5))
1486            .await?;
1487
1488        let mut heartbeats = Vec::new();
1489        for event in events {
1490            match serde_json::from_str::<HeartbeatContent>(&event.content) {
1491                Ok(hb) => heartbeats.push(hb),
1492                Err(e) => {
1493                    warn!("Failed to parse heartbeat {}: {}", event.id, e);
1494                }
1495            }
1496        }
1497
1498        Ok(heartbeats)
1499    }
1500
1501    /// Get the latest heartbeat for a provider (to check if online).
1502    /// Queries the stored kind 38384 (which now retains per-minute
1503    /// bucketed history thanks to the `d`-tag fix in Unit 4) within
1504    /// the live window. Ephemeral kind 20384 is not queried here
1505    /// because relays do not store it; it would only be visible to
1506    /// live subscribers.
1507    pub async fn get_latest_heartbeat(
1508        &self,
1509        provider_npub: &str,
1510    ) -> Result<Option<HeartbeatContent>> {
1511        let provider_pubkey = nostr_sdk::PublicKey::parse(provider_npub)?;
1512
1513        let live_since = std::time::SystemTime::now()
1514            .duration_since(std::time::UNIX_EPOCH)?
1515            .as_secs()
1516            - LIVE_HEARTBEAT_WINDOW_SECS;
1517
1518        let filter = Filter::new()
1519            .kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
1520            .author(provider_pubkey)
1521            .since(Timestamp::from(live_since))
1522            .limit(1);
1523
1524        let events = self
1525            .client
1526            .fetch_events(filter, std::time::Duration::from_secs(3))
1527            .await?;
1528
1529        if let Some(event) = events.first() {
1530            match serde_json::from_str::<HeartbeatContent>(&event.content) {
1531                Ok(hb) => return Ok(Some(hb)),
1532                Err(e) => warn!("Failed to parse heartbeat: {}", e),
1533            }
1534        }
1535
1536        Ok(None)
1537    }
1538
1539    /// Get the latest heartbeats for multiple providers in a single batch query
1540    pub async fn get_latest_heartbeats_multi(
1541        &self,
1542        provider_npubs: Vec<String>,
1543    ) -> Result<std::collections::HashMap<String, HeartbeatContent>> {
1544        if provider_npubs.is_empty() {
1545            return Ok(std::collections::HashMap::new());
1546        }
1547
1548        let mut pubkeys = Vec::new();
1549        for npub in provider_npubs {
1550            if let Ok(pk) = nostr_sdk::PublicKey::parse(&npub) {
1551                pubkeys.push(pk);
1552            }
1553        }
1554
1555        let live_since = std::time::SystemTime::now()
1556            .duration_since(std::time::UNIX_EPOCH)?
1557            .as_secs()
1558            - LIVE_HEARTBEAT_WINDOW_SECS;
1559
1560        // Query the stored kind 38384 (now retains bucketed history
1561        // per Unit 4) for "have any of these providers heartbeat'd
1562        // recently?". Ephemeral 20384 isn't queried because relays
1563        // do not store it.
1564        let filter = Filter::new()
1565            .kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
1566            .authors(pubkeys)
1567            .since(Timestamp::from(live_since));
1568
1569        // Use a short timeout of 3 seconds for fast feedback
1570        let events = self
1571            .client
1572            .fetch_events(filter, std::time::Duration::from_secs(3))
1573            .await?;
1574
1575        let mut heartbeats = std::collections::HashMap::new();
1576
1577        // Process events, keeping only the latest for each provider
1578        for event in events {
1579            if let Ok(hb) = serde_json::from_str::<HeartbeatContent>(&event.content) {
1580                match heartbeats.entry(hb.provider_npub.clone()) {
1581                    std::collections::hash_map::Entry::Occupied(mut entry) => {
1582                        let existing: &HeartbeatContent = entry.get();
1583                        if hb.timestamp > existing.timestamp {
1584                            entry.insert(hb);
1585                        }
1586                    }
1587                    std::collections::hash_map::Entry::Vacant(entry) => {
1588                        entry.insert(hb);
1589                    }
1590                }
1591            }
1592        }
1593
1594        Ok(heartbeats)
1595    }
1596
1597    /// Calculate uptime percentage for a provider over the last N
1598    /// days, against the stored kind 38384 (which now retains
1599    /// per-minute bucketed history thanks to Unit 4's `d`-tag fix —
1600    /// previously every new heartbeat replaced the prior one and
1601    /// uptime always returned ~0).
1602    pub async fn calculate_uptime(&self, provider_npub: &str, days: u32) -> Result<f32> {
1603        let now = std::time::SystemTime::now()
1604            .duration_since(std::time::UNIX_EPOCH)?
1605            .as_secs();
1606        let since = now - (days as u64 * 24 * 60 * 60);
1607
1608        let heartbeats = self.query_heartbeats(provider_npub, since).await?;
1609
1610        if heartbeats.is_empty() {
1611            return Ok(0.0);
1612        }
1613
1614        // Expected heartbeats: one per HEARTBEAT_BUCKET_SECS over
1615        // the window. Distinct heartbeats coexist on the relay
1616        // because each lands in its own bucketed `d`-tag slot.
1617        let expected = (days as f32) * 24.0 * 3600.0 / HEARTBEAT_BUCKET_SECS as f32;
1618        let actual = heartbeats.len() as f32;
1619
1620        Ok((actual / expected * 100.0).min(100.0))
1621    }
1622}
1623
1624#[derive(Debug, Clone, Serialize, Deserialize)]
1625pub struct StatusRequestContent {
1626    pub pod_id: String, // Can be NPUB or container ID
1627}
1628
1629#[derive(Debug, Clone, Serialize, Deserialize)]
1630pub struct StatusResponseContent {
1631    pub pod_id: String,
1632    pub status: String,
1633    pub expires_at: String,
1634    pub time_remaining_seconds: u64,
1635    pub cpu_millicores: u64,
1636    pub memory_mb: u64,
1637    pub ssh_host: String,
1638    pub ssh_port: u16,
1639    pub ssh_username: String,
1640}
1641
1642#[cfg(test)]
1643mod isolation_level_tests {
1644    use super::IsolationLevel;
1645
1646    #[test]
1647    fn rank_orders_isolation_strength() {
1648        assert!(IsolationLevel::SharedKernel.rank() < IsolationLevel::DedicatedHost.rank());
1649        assert!(IsolationLevel::DedicatedHost.rank() < IsolationLevel::AttestedResearchTier.rank());
1650    }
1651
1652    #[test]
1653    fn meets_accepts_equal_or_stricter_tiers() {
1654        // SharedKernel as the minimum: any tier qualifies.
1655        assert!(IsolationLevel::SharedKernel.meets(IsolationLevel::SharedKernel));
1656        assert!(IsolationLevel::DedicatedHost.meets(IsolationLevel::SharedKernel));
1657        assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::SharedKernel));
1658        // DedicatedHost as the minimum: SharedKernel does NOT qualify.
1659        assert!(!IsolationLevel::SharedKernel.meets(IsolationLevel::DedicatedHost));
1660        assert!(IsolationLevel::DedicatedHost.meets(IsolationLevel::DedicatedHost));
1661        assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::DedicatedHost));
1662        // AttestedResearchTier as the minimum: only it qualifies.
1663        assert!(!IsolationLevel::SharedKernel.meets(IsolationLevel::AttestedResearchTier));
1664        assert!(!IsolationLevel::DedicatedHost.meets(IsolationLevel::AttestedResearchTier));
1665        assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::AttestedResearchTier));
1666    }
1667
1668    #[test]
1669    fn slug_round_trips() {
1670        for level in [
1671            IsolationLevel::SharedKernel,
1672            IsolationLevel::DedicatedHost,
1673            IsolationLevel::AttestedResearchTier,
1674        ] {
1675            assert_eq!(IsolationLevel::from_slug(level.slug()), Some(level));
1676        }
1677    }
1678
1679    #[test]
1680    fn from_slug_rejects_unknown() {
1681        assert!(IsolationLevel::from_slug("paranoid-mode").is_none());
1682        assert!(IsolationLevel::from_slug("").is_none());
1683        // Underscore form (not what we serialize) — must NOT be accepted.
1684        assert!(IsolationLevel::from_slug("dedicated_host").is_none());
1685    }
1686}
1687
1688#[cfg(test)]
1689mod npubs_equal_tests {
1690    use super::*;
1691
1692    // A real Nostr secret key generated via Keys::generate() in
1693    // test setup, frozen here to avoid relying on test-time
1694    // randomness. The two encodings of its public key:
1695    //   bech32: npub1ae40uj62de87f8tvx56e6ytp5m7jd7l96mh0ew43e8q5wucm7z9q2uqvuc
1696    //   hex:    ee6afe4b4a6e4fe49d6c3534eb446868df49bef2eb77f2eac72707473b1bf045
1697
1698    const PUBKEY_BECH32: &str = "npub1ae40uj62de87f8tvx56e6ytp5m7jd7l96mh0ew43e8q5wucm7z9q2uqvuc";
1699    const PUBKEY_HEX: &str = "ee6afe4b4a6e4fe49d6c35359d1161a6fd26fbe5d6eefcbab1c9c147731bf08a";
1700
1701    #[test]
1702    fn bech32_matches_itself() {
1703        assert!(npubs_equal(PUBKEY_BECH32, PUBKEY_BECH32));
1704    }
1705
1706    #[test]
1707    fn hex_matches_itself() {
1708        assert!(npubs_equal(PUBKEY_HEX, PUBKEY_HEX));
1709    }
1710
1711    /// The actual bug surfaced by the warm-standby end-to-end test:
1712    /// the provider stores its npub as hex (via `.to_hex()`), the
1713    /// consumer ships bech32. Without normalization, the role check
1714    /// always returned `NotAddressed`, breaking warm-standby for
1715    /// every consumer using the bech32 form.
1716    #[test]
1717    fn bech32_matches_hex_for_same_key() {
1718        assert!(npubs_equal(PUBKEY_BECH32, PUBKEY_HEX));
1719        assert!(npubs_equal(PUBKEY_HEX, PUBKEY_BECH32));
1720    }
1721
1722    #[test]
1723    fn different_keys_in_different_encodings_do_not_match() {
1724        // A different bech32 npub.
1725        let other_bech32 = "npub1hyr9m7zeegr98w4e07gvdpqrk25jfp3vku8029u8pcxsc48dq6nqxtwztv";
1726        assert!(!npubs_equal(PUBKEY_HEX, other_bech32));
1727    }
1728
1729    #[test]
1730    fn unparseable_strings_fall_back_to_string_equality() {
1731        // Test placeholder npubs (used by existing provider tests
1732        // with strings like "npub1primary") still match by direct
1733        // equality; that's the contract.
1734        assert!(npubs_equal("npub1primary", "npub1primary"));
1735        assert!(!npubs_equal("npub1primary", "npub1secondary"));
1736    }
1737
1738    #[test]
1739    fn one_real_one_typoed_returns_false() {
1740        // Mixing a real key with a typoed/placeholder string must
1741        // never match — the typoed string isn't a key.
1742        assert!(!npubs_equal(PUBKEY_BECH32, "npub1primary"));
1743        assert!(!npubs_equal("npub1primary", PUBKEY_HEX));
1744    }
1745}