Skip to main content

paygress/
provider.rs

1// Provider Service
2//
3// Runs on machine operator's server to:
4// - Publish provider offer to Nostr
5// - Send periodic heartbeats
6// - Listen for and handle spawn requests
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15use crate::cashu::{
16    derive_seed_from_nostr_key, validate_and_redeem, CdkRedeemer, MintRedeemer, RedeemError,
17};
18use crate::compute::{ComputeBackend, ContainerConfig, PortMapping};
19use crate::docker::DockerBackend;
20use crate::durable_workload::{
21    DurableWorkload, HeartbeatObservation, QuorumConfig, StateMachineEvent, WorkloadState,
22    WorkloadStateMachine,
23};
24use crate::lxd::LxdBackend;
25use crate::nostr::{
26    parse_private_message_content, warm_standby_role, AccessDetailsContent, CapacityInfo,
27    EncryptedSpawnPodRequest, EncryptedTopUpPodRequest, ErrorResponseContent, HeartbeatContent,
28    LeaseRevocationContent, NostrRelaySubscriber, PodSpec, PrivateRequest, ProviderOfferContent,
29    RelayConfig, StandbyPromotionAnnouncementContent, StatusRequestContent, StatusResponseContent,
30    TopUpResponseContent, WarmStandbyRole,
31};
32use crate::proxmox::{ProxmoxBackend, ProxmoxClient};
33use crate::templates::{TemplateDefinition, TemplateName};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36pub enum BackendType {
37    Proxmox,
38    LXD,
39    /// Docker backend. Required for the killer-templates path
40    /// (#31): templates use real public Docker images that LXD
41    /// can't run natively. Provider must have the `docker` CLI
42    /// installed and accessible to the running user.
43    Docker,
44    /// KVM/qemu backend. Each spawn is its own VM with its own
45    /// kernel — no co-tenant attacks via container escape.
46    /// Publishes `IsolationLevel::DedicatedHost` on the offer so
47    /// consumers filtering by `--isolation-level dedicated-host`
48    /// match this provider. Requires `/dev/kvm` and
49    /// `qemu-system-x86_64` on the host. Killer templates (Docker
50    /// images) are NOT served on this backend in v1; consumers get
51    /// vanilla Ubuntu VMs with SSH access.
52    Kvm,
53}
54
55impl Default for BackendType {
56    fn default() -> Self {
57        Self::Proxmox
58    }
59}
60
61/// Provider configuration
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ProviderConfig {
64    #[serde(default)]
65    pub backend_type: BackendType,
66
67    // Proxmox / Backend settings
68    pub proxmox_url: String,
69    pub proxmox_token_id: String,
70    pub proxmox_token_secret: String,
71    pub proxmox_node: String,
72    pub proxmox_storage: String,
73    pub proxmox_template: String,
74    pub proxmox_bridge: String,
75    pub vmid_range_start: u32,
76    pub vmid_range_end: u32,
77
78    // Nostr settings
79    pub nostr_private_key: String,
80    pub nostr_relays: Vec<String>,
81
82    // Provider metadata
83    pub provider_name: String,
84    pub provider_location: Option<String>,
85    pub public_ip: String,
86    pub capabilities: Vec<String>,
87
88    // Pricing & specs
89    pub specs: Vec<PodSpec>,
90    pub whitelisted_mints: Vec<String>,
91
92    // Operational settings
93    pub heartbeat_interval_secs: u64,
94    pub minimum_duration_seconds: u64,
95
96    // Tunnel settings (for providers behind NAT)
97    #[serde(default)]
98    pub tunnel_enabled: bool,
99    #[serde(default)]
100    pub tunnel_interface: Option<String>,
101    #[serde(default)]
102    pub ssh_port_start: Option<u16>,
103    #[serde(default)]
104    pub ssh_port_end: Option<u16>,
105
106    // Cashu wallet settings (Unit 1: real mint redemption on the
107    // Nostr-DM path). The wallet stores swapped proofs, keysets, and
108    // quotes; one redb file holds state for every mint the provider
109    // accepts. Defaults to a path next to the binary so existing
110    // operators don't need to update their config.
111    #[serde(default = "default_cashu_wallet_db_path")]
112    pub cashu_wallet_db_path: String,
113}
114
115fn default_cashu_wallet_db_path() -> String {
116    "./paygress-cashu-wallet.redb".to_string()
117}
118
119impl Default for ProviderConfig {
120    fn default() -> Self {
121        Self {
122            backend_type: BackendType::Proxmox,
123            proxmox_url: "https://localhost:8006/api2/json".to_string(),
124            proxmox_token_id: "root@pam!paygress".to_string(),
125            proxmox_token_secret: String::new(),
126            proxmox_node: "pve".to_string(),
127            proxmox_storage: "local-lvm".to_string(),
128            proxmox_template: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
129            proxmox_bridge: "vmbr0".to_string(),
130            vmid_range_start: 1000,
131            vmid_range_end: 1999,
132            nostr_private_key: String::new(),
133            nostr_relays: vec![
134                "wss://relay.damus.io".to_string(),
135                "wss://nos.lol".to_string(),
136            ],
137            provider_name: "Paygress Provider".to_string(),
138            provider_location: None,
139            public_ip: "127.0.0.1".to_string(),
140            capabilities: vec!["lxc".to_string()],
141            specs: vec![PodSpec {
142                id: "basic".to_string(),
143                name: "Basic".to_string(),
144                description: "1 vCPU, 1GB RAM".to_string(),
145                cpu_millicores: 1000,
146                memory_mb: 1024,
147                rate_msats_per_sec: 50,
148            }],
149            whitelisted_mints: vec!["https://mint.minibits.cash".to_string()],
150            heartbeat_interval_secs: 60,
151            minimum_duration_seconds: 60,
152            tunnel_enabled: false,
153            tunnel_interface: None,
154            ssh_port_start: None,
155            ssh_port_end: None,
156            cashu_wallet_db_path: default_cashu_wallet_db_path(),
157        }
158    }
159}
160
161/// Active workload tracking
162#[derive(Debug, Clone, Serialize)]
163pub struct WorkloadInfo {
164    pub vmid: u32,
165    pub workload_type: String, // "lxc" or "vm"
166    pub spec_id: String,
167    pub created_at: u64,
168    pub expires_at: u64,
169    pub owner_npub: String,
170
171    /// Replication mode chosen at spawn time. `None` for the default
172    /// single-container path (no failover); `WarmStandby` registers a
173    /// list of standby providers so the orchestrator emits a
174    /// `LeaseRevocation` on local eviction. `Checkpointed` is reserved
175    /// for Unit 6 (consumer-side respawn from Blossom checkpoint).
176    #[serde(default)]
177    pub replication: crate::durable_workload::ReplicationMode,
178
179    /// Restart policy applied when the workload is locally evicted
180    /// without a warm-standby. Default: `OnFailure { max_attempts: 3 }`.
181    #[serde(default)]
182    pub restart_policy: crate::durable_workload::RestartPolicy,
183
184    /// Optional Blossom URI of the latest published checkpoint for
185    /// this workload. Populated by Unit 6 (checkpoint pipeline);
186    /// included in revocation events so a standby can restore.
187    #[serde(default, skip_serializing_if = "Option::is_none")]
188    pub state_uri: Option<String>,
189
190    /// Consumer-assigned workload identifier (UUID-shaped string),
191    /// set from `EncryptedSpawnPodRequest.workload_id` at spawn time
192    /// for warm-standby workloads. Used by the orchestrator's
193    /// `PublishLeaseRevocation` handler so the published revocation
194    /// carries the same id the standbys keyed their slots by.
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    pub consumer_workload_id: Option<String>,
197}
198
199/// A standby slot reserved for a warm-standby workload. The
200/// consumer's spawn request was paid for and acknowledged, but no
201/// container has been created yet — the standby is "armed" and
202/// waiting for a `LeaseRevocation` event from the primary.
203///
204/// Stored in `ProviderService::standby_slots` keyed by `workload_id`
205/// (consumer-assigned UUID, shared by primary and all standbys).
206/// On revocation, the standby's promotion handler:
207///   1. Sleeps `index * promotion_delay_secs` (ordered backoff for
208///      single-writer; standby 0 promotes immediately, standby 1
209///      waits one delay window, etc.)
210///   2. Spawns the container using `container_config`
211///   3. Removes the slot, adds to `active_workloads`, registers with
212///      the state machine as primary
213#[derive(Debug, Clone)]
214pub struct StandbySlot {
215    pub workload_id: String,
216    pub primary_npub: String,
217    pub standby_index: usize,
218    pub standby_count: usize,
219    pub container_config: ContainerConfig,
220    pub spec_id: String,
221    pub expires_at: u64,
222    pub owner_npub: String,
223    /// Unix-second timestamp at which the slot was reserved. Used as
224    /// the silence-baseline by the watchdog when no primary heartbeat
225    /// has been observed yet — without this, a fresh slot would treat
226    /// `last_seen == 0` as immediate silence and promote a healthy
227    /// primary on the first watchdog tick (race window: 0–60s while
228    /// waiting for the primary's next heartbeat to land on the relay).
229    pub created_at: u64,
230    /// Npubs of the OTHER standbys for this workload (excludes self
231    /// and the primary). Used at promotion-time to detect that a
232    /// lower-indexed peer has already promoted: a freshly-promoted
233    /// peer publishes a heartbeat from its own npub immediately, and
234    /// this standby queries those npubs before claiming the slot
235    /// itself. Without this list, every standby would promote
236    /// independently after the silence window, producing split-brain.
237    pub peer_standby_npubs: Vec<String>,
238}
239
240/// Provider service that manages the node
241pub struct ProviderService {
242    config: ProviderConfig,
243    backend: Arc<dyn ComputeBackend>,
244    nostr: NostrRelaySubscriber,
245    redeemer: Arc<dyn MintRedeemer>,
246    active_workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
247    stats: Arc<Mutex<ProviderStats>>,
248
249    /// Workload state machine (Unit 5 wiring). Keyed by vmid; each
250    /// entry tracks the lifecycle of one local workload through
251    /// `Provisioning → Live → Suspect → Evicted/Respawning/Failed`.
252    /// The orchestrator loop ticks this against the buffered
253    /// `HeartbeatObservation`s and acts on emitted events.
254    state_machine: Arc<Mutex<WorkloadStateMachine>>,
255
256    /// Buffered heartbeat observations awaiting the next orchestrator
257    /// tick. Filled by the heartbeat loop after each publish (one
258    /// observation per relay that ACK'd), drained on each
259    /// orchestrator iteration.
260    observation_buffer: Arc<Mutex<Vec<HeartbeatObservation>>>,
261
262    /// Reserved warm-standby slots, keyed by consumer-assigned
263    /// `workload_id`. Populated when a spawn request arrives with
264    /// `replication = WarmStandby` AND the role-detection helper
265    /// classifies this provider as a standby. Drained when a
266    /// matching `LeaseRevocation` arrives (slot promotes to a real
267    /// active workload) or when the slot's expiry passes (cleanup).
268    standby_slots: Arc<Mutex<HashMap<String, StandbySlot>>>,
269}
270
271#[derive(Debug, Clone, Default)]
272struct ProviderStats {
273    total_jobs_completed: u64,
274    uptime_start: u64,
275}
276
277impl ProviderService {
278    /// Create a new provider service
279    pub async fn new(config: ProviderConfig) -> Result<Self> {
280        let backend: Arc<dyn ComputeBackend> = match config.backend_type {
281            BackendType::Proxmox => {
282                let client = ProxmoxClient::new(
283                    &config.proxmox_url,
284                    &config.proxmox_token_id,
285                    &config.proxmox_token_secret,
286                    &config.proxmox_node,
287                )?;
288                Arc::new(ProxmoxBackend::new(
289                    client,
290                    &config.proxmox_storage,
291                    &config.proxmox_bridge,
292                    &config.proxmox_template,
293                ))
294            }
295            BackendType::LXD => Arc::new(LxdBackend::new(
296                &config.proxmox_storage, // Reuse storage field for pool name
297                &config.proxmox_bridge,  // Reuse bridge for network
298            )),
299            BackendType::Docker => Arc::new(DockerBackend::new()),
300            BackendType::Kvm => {
301                // Fail-fast: surface the "this host doesn't have
302                // KVM" error at provider startup, not at first
303                // spawn (when a paying consumer has already
304                // committed a Cashu token).
305                if let Err(e) = crate::kvm::KvmBackend::check_kvm_available().await {
306                    tracing::error!("KVM backend selected but unavailable: {}", e);
307                    anyhow::bail!("KVM backend unavailable: {}", e);
308                }
309                Arc::new(crate::kvm::KvmBackend::new(crate::kvm::KvmConfig::default()))
310            }
311        };
312
313        // Initialize Nostr client
314        let relay_config = RelayConfig {
315            relays: config.nostr_relays.clone(),
316            private_key: Some(config.nostr_private_key.clone()),
317        };
318        let nostr = NostrRelaySubscriber::new(relay_config).await?;
319
320        // Initialize the Cashu redeemer. Wallet identity is derived
321        // deterministically from the provider's Nostr private key so
322        // the same provider sees a consistent proof history across
323        // restarts. The redb file holds proofs, keysets, and quotes
324        // for every mint this provider accepts.
325        let wallet_db = cdk_redb::wallet::WalletRedbDatabase::new(std::path::Path::new(
326            &config.cashu_wallet_db_path,
327        ))
328        .map_err(|e| {
329            anyhow::anyhow!(
330                "failed to open cashu wallet database at {}: {}",
331                config.cashu_wallet_db_path,
332                e
333            )
334        })?;
335        let seed = derive_seed_from_nostr_key(&config.nostr_private_key);
336        let redeemer: Arc<dyn MintRedeemer> = Arc::new(CdkRedeemer::new(Arc::new(wallet_db), seed));
337
338        let now = std::time::SystemTime::now()
339            .duration_since(std::time::UNIX_EPOCH)?
340            .as_secs();
341
342        Ok(Self {
343            config,
344            backend,
345            nostr,
346            redeemer,
347            active_workloads: Arc::new(Mutex::new(HashMap::new())),
348            stats: Arc::new(Mutex::new(ProviderStats {
349                total_jobs_completed: 0,
350                uptime_start: now,
351            })),
352            state_machine: Arc::new(Mutex::new(WorkloadStateMachine::new(
353                QuorumConfig::default(),
354            ))),
355            observation_buffer: Arc::new(Mutex::new(Vec::new())),
356            standby_slots: Arc::new(Mutex::new(HashMap::new())),
357        })
358    }
359
360    /// Get the provider's public key (npub)
361    pub fn get_npub(&self) -> String {
362        self.nostr.get_service_public_key()
363    }
364
365    /// Start the provider service (runs forever)
366    pub async fn run(&self) -> Result<()> {
367        info!("🚀 Starting Paygress Provider Service");
368        info!("Provider: {}", self.config.provider_name);
369        info!("NPUB: {}", self.get_npub());
370
371        // Publish initial offer
372        self.publish_offer().await?;
373
374        // Run heartbeat loop, request listener, cleanup loop,
375        // orchestrator loop (Unit 5 wiring), and standby watchdog
376        // (closes the hard-crash failover gap) concurrently.
377        tokio::select! {
378            result = self.heartbeat_loop() => {
379                error!("Heartbeat loop exited: {:?}", result);
380                result
381            }
382            result = self.listen_for_requests() => {
383                error!("Request listener exited: {:?}", result);
384                result
385            }
386            result = self.cleanup_loop() => {
387                error!("Cleanup loop exited: {:?}", result);
388                result
389            }
390            result = self.orchestrator_loop() => {
391                error!("Orchestrator loop exited: {:?}", result);
392                result
393            }
394            result = self.standby_watchdog_loop() => {
395                error!("Standby watchdog loop exited: {:?}", result);
396                result
397            }
398        }
399    }
400
401    /// Publish provider offer to Nostr
402    async fn publish_offer(&self) -> Result<()> {
403        let stats = self.stats.lock().await;
404
405        let offer = ProviderOfferContent {
406            provider_npub: self.get_npub(),
407            hostname: self.config.provider_name.clone(),
408            location: self.config.provider_location.clone(),
409            capabilities: self.config.capabilities.clone(),
410            specs: self.config.specs.clone(),
411            whitelisted_mints: self.config.whitelisted_mints.clone(),
412            uptime_percent: 100.0, // Will be calculated from heartbeat history
413            total_jobs_completed: stats.total_jobs_completed,
414            api_endpoint: None, // TODO: Add if supporting direct API
415            version: crate::nostr::SCHEMA_VERSION,
416            // Derive isolation tier from the configured backend.
417            // KVM is per-VM (DedicatedHost). Docker / LXD / Proxmox
418            // share the host kernel (SharedKernel) — same tier as
419            // historical default. SEV-SNP / TDX gets its own
420            // backend later and bumps to AttestedResearchTier.
421            isolation_level: match self.config.backend_type {
422                BackendType::Kvm => crate::nostr::IsolationLevel::DedicatedHost,
423                BackendType::Proxmox | BackendType::LXD | BackendType::Docker => {
424                    crate::nostr::IsolationLevel::SharedKernel
425                }
426            },
427            stake_proof: None,
428        };
429
430        self.nostr.publish_provider_offer(offer).await?;
431        Ok(())
432    }
433
434    /// Send heartbeat every N seconds
435    async fn heartbeat_loop(&self) -> Result<()> {
436        let interval = tokio::time::Duration::from_secs(self.config.heartbeat_interval_secs);
437
438        loop {
439            if let Err(e) = self.send_heartbeat().await {
440                warn!("Failed to send heartbeat: {}", e);
441            }
442            tokio::time::sleep(interval).await;
443        }
444    }
445
446    /// Send a single heartbeat
447    async fn send_heartbeat(&self) -> Result<()> {
448        let workloads = self.active_workloads.lock().await;
449
450        // Get node status for capacity info
451        let capacity = match self.backend.get_node_status().await {
452            Ok(status) => CapacityInfo {
453                cpu_available: ((1.0 - status.cpu_usage) * 100000.0) as u64, // Convert to millicores
454                memory_mb_available: status.memory_total.saturating_sub(status.memory_used)
455                    / (1024 * 1024),
456                storage_gb_available: status.disk_total.saturating_sub(status.disk_used)
457                    / (1024 * 1024 * 1024),
458            },
459            Err(e) => {
460                warn!("Failed to get node status: {}", e);
461                CapacityInfo {
462                    cpu_available: 0,
463                    memory_mb_available: 0,
464                    storage_gb_available: 0,
465                }
466            }
467        };
468
469        let now = std::time::SystemTime::now()
470            .duration_since(std::time::UNIX_EPOCH)?
471            .as_secs();
472
473        let heartbeat = HeartbeatContent {
474            provider_npub: self.get_npub(),
475            timestamp: now,
476            active_workloads: workloads.len() as u32,
477            available_capacity: capacity,
478            version: crate::nostr::SCHEMA_VERSION,
479        };
480
481        let (_event_id, accepting_relays) = self.nostr.publish_heartbeat(heartbeat).await?;
482
483        // Push one HeartbeatObservation per accepting relay into the
484        // shared buffer. The orchestrator loop drains these on its
485        // next tick to drive the workload state machine. Observations
486        // use `now` for both `seen_at` and `event_timestamp` because
487        // we just published; if a relay didn't ACK we don't fabricate
488        // an observation for it.
489        if !accepting_relays.is_empty() {
490            let provider_npub = self.get_npub();
491            let mut buf = self.observation_buffer.lock().await;
492            for relay_url in accepting_relays {
493                buf.push(HeartbeatObservation {
494                    provider_npub: provider_npub.clone(),
495                    relay_url,
496                    seen_at: now,
497                    event_timestamp: now,
498                });
499            }
500        }
501
502        Ok(())
503    }
504
505    /// Listen for spawn requests via NIP-17
506    async fn listen_for_requests(&self) -> Result<()> {
507        info!("Listening for Paygress requests...");
508
509        // Clone what we need for the handler
510        let backend = self.backend.clone();
511        let config = self.config.clone();
512        let nostr = self.nostr.clone();
513        let redeemer = self.redeemer.clone();
514        let workloads = self.active_workloads.clone();
515        let stats = self.stats.clone();
516        let state_machine = self.state_machine.clone();
517        let standby_slots = self.standby_slots.clone();
518
519        self.nostr
520            .subscribe_to_pod_events(move |event| {
521                let backend = backend.clone();
522                let config = config.clone();
523                let nostr = nostr.clone();
524                let redeemer = redeemer.clone();
525                let workloads = workloads.clone();
526                let stats = stats.clone();
527                let state_machine = state_machine.clone();
528                let standby_slots = standby_slots.clone();
529
530                Box::pin(async move {
531                    let my_pubkey = nostr.public_key().to_hex();
532                    if event.pubkey == my_pubkey {
533                        return Ok(());
534                    }
535
536                    debug!(
537                        "Handler received event kind: {}, from: {}, message_type: {}",
538                        event.kind, event.pubkey, event.message_type
539                    );
540
541                    // Lease revocation events take a separate path
542                    // (Unit 5 standby-side promotion). Public events,
543                    // no decryption, no response.
544                    if let Some(revocation) = crate::nostr::parse_revocation_event(&event) {
545                        info!(
546                            "Lease revocation observed: workload_id={}, primary={}, reason={}, state_uri={:?}, standbys={:?}",
547                            revocation.workload_id,
548                            revocation.primary_provider_npub,
549                            revocation.reason,
550                            revocation.state_uri,
551                            revocation.standby_providers,
552                        );
553                        // Look up the matching standby slot. If
554                        // present, schedule the promotion task; the
555                        // ordered backoff inside that task gives us
556                        // single-writer across N standbys (best-effort
557                        // — see scheduler for the v1 caveat).
558                        let workload_id = revocation.workload_id.clone();
559                        let primary_npub = revocation.primary_provider_npub.clone();
560                        let slot_opt = standby_slots.lock().await.get(&workload_id).cloned();
561                        if let Some(slot) = slot_opt {
562                            if slot.primary_npub != primary_npub {
563                                warn!(
564                                    "Revocation primary_npub ({}) does not match slot's primary ({}); ignoring",
565                                    primary_npub, slot.primary_npub
566                                );
567                                return Ok(());
568                            }
569                            schedule_standby_promotion(
570                                backend.clone(),
571                                workloads.clone(),
572                                state_machine.clone(),
573                                standby_slots.clone(),
574                                nostr.clone(),
575                                slot,
576                            );
577                        } else {
578                            debug!(
579                                "Revocation workload_id={} did not match any local standby slot; ignoring",
580                                workload_id
581                            );
582                        }
583                        return Ok(());
584                    }
585
586                    // Parse the request
587                    let request_type = match parse_private_message_content(&event.content) {
588                        Ok(req) => req,
589                        Err(e) => {
590                            warn!("Failed to parse request from {}: {}", event.pubkey, e);
591                            let error = ErrorResponseContent {
592                                error_type: "invalid_request".to_string(),
593                                message: "Failed to parse request".to_string(),
594                                details: Some(e.to_string()),
595                            };
596                            let _ = nostr
597                                .send_error_response_private_message(
598                                    &event.pubkey,
599                                    error,
600                                    &event.message_type,
601                                )
602                                .await;
603                            return Ok(());
604                        }
605                    };
606
607                    debug!("Successfully parsed request metadata");
608
609                    // Dispatch to specific handler
610                    match request_type {
611                        PrivateRequest::Spawn(spawn_req) => {
612                            if let Err(e) = handle_spawn_request(
613                                backend.as_ref(),
614                                &config,
615                                &nostr,
616                                redeemer.as_ref(),
617                                &workloads,
618                                &stats,
619                                &state_machine,
620                                &standby_slots,
621                                &event.pubkey,
622                                &event.message_type,
623                                spawn_req,
624                            )
625                            .await
626                            {
627                                error!("Failed to handle spawn request: {}", e);
628                            }
629                        }
630                        PrivateRequest::Status(status_req) => {
631                            if let Err(e) = handle_status_request(
632                                &config,
633                                &nostr,
634                                &workloads,
635                                &event.pubkey,
636                                &event.message_type,
637                                status_req,
638                            )
639                            .await
640                            {
641                                error!("Failed to handle status request: {}", e);
642                            }
643                        }
644                        PrivateRequest::TopUp(topup_req) => {
645                            if let Err(e) = handle_topup_request(
646                                &config,
647                                &nostr,
648                                redeemer.as_ref(),
649                                &workloads,
650                                &event.pubkey,
651                                &event.message_type,
652                                topup_req,
653                            )
654                            .await
655                            {
656                                error!("Failed to handle topup request: {}", e);
657                            }
658                        }
659                    }
660
661                    Ok(())
662                })
663            })
664            .await?;
665
666        Ok(())
667    }
668
669    /// Orchestrator loop (Unit 5 wiring).
670    ///
671    /// Every 15s, drain the observation buffer, advance the workload
672    /// state machine, and act on each emitted `StateMachineEvent`.
673    /// 15s is chosen to be much shorter than `t1=120s` and `t2=300s`
674    /// so transitions are detected promptly, but not so short that
675    /// idle providers churn — the underlying state machine is a pure
676    /// function and the work is bounded by the number of tracked
677    /// workloads.
678    async fn orchestrator_loop(&self) -> Result<()> {
679        let interval = tokio::time::Duration::from_secs(15);
680        info!("Orchestrator loop starting (cadence: 15s)");
681
682        loop {
683            tokio::time::sleep(interval).await;
684
685            let now = std::time::SystemTime::now()
686                .duration_since(std::time::UNIX_EPOCH)?
687                .as_secs();
688
689            // Drain buffered observations. We pass them to the state
690            // machine and clear the buffer so the next tick only sees
691            // newer observations — a stale ACK older than `stale_secs`
692            // would already be ignored by the state machine, but
693            // draining keeps the buffer bounded regardless.
694            let observations: Vec<HeartbeatObservation> = {
695                let mut buf = self.observation_buffer.lock().await;
696                std::mem::take(&mut *buf)
697            };
698
699            // Tick the state machine. Holds the state machine lock
700            // across the tick (a short, pure operation).
701            let events = {
702                let mut sm = self.state_machine.lock().await;
703                sm.tick(now, &observations)
704            };
705
706            if events.is_empty() {
707                continue;
708            }
709
710            for event in events {
711                self.handle_state_machine_event(event, now).await;
712            }
713        }
714    }
715
716    /// Translate one `StateMachineEvent` into provider actions.
717    /// Logged regardless; events that require I/O (revocation publish,
718    /// respawn) are dispatched to the appropriate subsystem.
719    async fn handle_state_machine_event(&self, event: StateMachineEvent, now: u64) {
720        match event {
721            StateMachineEvent::EnteredLive { workload_id } => {
722                info!("Workload {} entered Live", workload_id);
723            }
724            StateMachineEvent::EnteredSuspect { workload_id } => {
725                warn!(
726                    "Workload {} entered Suspect (heartbeat quorum lost)",
727                    workload_id
728                );
729            }
730            StateMachineEvent::Evicted {
731                workload_id,
732                reason,
733            } => {
734                error!("Workload {} evicted: {}", workload_id, reason);
735            }
736            StateMachineEvent::PublishLeaseRevocation {
737                workload_id,
738                standby_providers,
739            } => {
740                // Look up the workload's consumer-assigned UUID (set
741                // at spawn time from request.workload_id). If absent,
742                // fall back to a derived id based on the local vmid —
743                // this case is unreachable for a real warm-standby
744                // workload because the spawn handler enforces the
745                // UUID, but the fallback keeps the publish call
746                // total in case of state-machine bugs.
747                let (consumer_workload_id, state_uri) = {
748                    let lock = self.active_workloads.lock().await;
749                    let entry = lock.get(&workload_id);
750                    let cid = entry
751                        .and_then(|w| w.consumer_workload_id.clone())
752                        .unwrap_or_else(|| format!("vmid-{}", workload_id));
753                    let suri = entry.and_then(|w| w.state_uri.clone());
754                    (cid, suri)
755                };
756                let revocation = LeaseRevocationContent {
757                    workload_id: consumer_workload_id.clone(),
758                    primary_provider_npub: self.get_npub(),
759                    standby_providers: standby_providers.clone(),
760                    reason: "heartbeat-quorum-lost-past-t2".to_string(),
761                    revoked_at: now,
762                    state_uri,
763                    version: crate::nostr::SCHEMA_VERSION,
764                };
765                match self.nostr.publish_lease_revocation(revocation).await {
766                    Ok(event_id) => info!(
767                        "Published lease revocation for workload {} (vmid {}) to {} standby(s): {}",
768                        consumer_workload_id,
769                        workload_id,
770                        standby_providers.len(),
771                        event_id
772                    ),
773                    Err(e) => error!(
774                        "Failed to publish lease revocation for workload {}: {}",
775                        workload_id, e
776                    ),
777                }
778            }
779            StateMachineEvent::AttemptRespawn {
780                workload_id,
781                attempt,
782            } => {
783                info!(
784                    "Attempting respawn of workload {} (attempt {})",
785                    workload_id, attempt
786                );
787                // The full respawn path requires reconstructing the
788                // ContainerConfig from the original spawn, which lives
789                // in active_workloads only as `WorkloadInfo` (no image
790                // / port mapping retained). Capturing the original
791                // ContainerConfig is a follow-up. For now we record
792                // the failure so the state machine can retry / fail
793                // out deterministically rather than hanging in
794                // Respawning forever.
795                let mut sm = self.state_machine.lock().await;
796                sm.notify_respawn_failed(
797                    workload_id,
798                    "respawn handler not yet implemented (follow-up)",
799                );
800            }
801            StateMachineEvent::Failed {
802                workload_id,
803                reason,
804            } => {
805                error!("Workload {} marked Failed: {}", workload_id, reason);
806                // Drop from active_workloads so cleanup_loop doesn't
807                // also try to delete a container that was never
808                // successfully respawned.
809                let mut wl = self.active_workloads.lock().await;
810                wl.remove(&workload_id);
811            }
812        }
813    }
814
815    /// Cleanup expired workloads
816    async fn cleanup_loop(&self) -> Result<()> {
817        let interval = tokio::time::Duration::from_secs(30);
818
819        loop {
820            tokio::time::sleep(interval).await;
821
822            let now = std::time::SystemTime::now()
823                .duration_since(std::time::UNIX_EPOCH)?
824                .as_secs();
825
826            let mut workloads = self.active_workloads.lock().await;
827            let expired: Vec<u32> = workloads
828                .iter()
829                .filter(|(_, w)| w.expires_at <= now)
830                .map(|(vmid, _)| *vmid)
831                .collect();
832
833            for vmid in expired {
834                info!("Cleaning up expired workload: {}", vmid);
835
836                if let Some(_workload) = workloads.remove(&vmid) {
837                    let stop_result = self.backend.stop_container(vmid).await;
838                    let result = match stop_result {
839                        Ok(_) => self.backend.delete_container(vmid).await,
840                        Err(e) => Err(e),
841                    };
842
843                    // Untrack from the state machine regardless of
844                    // backend stop/delete success — the lease is over,
845                    // and we don't want the orchestrator continuing
846                    // to drive transitions on a workload nobody is
847                    // serving anymore.
848                    self.state_machine.lock().await.untrack(vmid);
849
850                    match result {
851                        Ok(_) => {
852                            info!("Cleaned up workload {}", vmid);
853                            let mut stats = self.stats.lock().await;
854                            stats.total_jobs_completed += 1;
855                        }
856                        Err(e) => error!("Failed to cleanup workload {}: {}", vmid, e),
857                    }
858                }
859            }
860            drop(workloads);
861
862            // Expire reserved-but-never-promoted standby slots whose
863            // lease window has passed. Without this, a standby slot
864            // for a workload that never failed over (the common case
865            // — primaries usually outlive their lease) accumulates in
866            // the slots map until process restart. The watchdog skips
867            // promotion-on-silence checks for past-expiry slots
868            // anyway, but the memory grows unbounded across long-
869            // running providers serving many warm-standby workloads.
870            let mut slots = self.standby_slots.lock().await;
871            let expired_slots: Vec<String> = slots
872                .iter()
873                .filter(|(_, slot)| slot.expires_at <= now)
874                .map(|(workload_id, _)| workload_id.clone())
875                .collect();
876            for workload_id in expired_slots {
877                if let Some(slot) = slots.remove(&workload_id) {
878                    info!(
879                        "Expiring standby slot for workload {} (index {}/{}, primary {}, expired at {})",
880                        workload_id, slot.standby_index, slot.standby_count, slot.primary_npub, slot.expires_at
881                    );
882                }
883            }
884        }
885    }
886
887    /// Standby watchdog: detect a primary that has stopped publishing
888    /// heartbeats and promote ourselves on its behalf.
889    ///
890    /// Why this exists
891    /// ---------------
892    /// PR #43 wired the standby's `LeaseRevocation` listener: when
893    /// the primary's orchestrator emits a revocation event (graceful
894    /// "I can no longer keep this workload alive"), the standby
895    /// promotes after `index * STANDBY_PROMOTION_DELAY_SECS`. That
896    /// covers the *graceful* failover case — primary still has
897    /// network access to publish, just decided to give up the lease.
898    ///
899    /// It does NOT cover **hard crash**: primary process dies, host
900    /// loses network, kernel panics, etc. No revocation event ever
901    /// fires; standbys never promote. Without this loop, paygress's
902    /// warm-standby promise reduces to "high-availability against
903    /// the workload itself dying, not against the provider hosting
904    /// it dying" — which is the more common failure mode.
905    ///
906    /// How it works
907    /// ------------
908    /// Every `STANDBY_WATCHDOG_INTERVAL_SECS`, the standby:
909    ///   1. Snapshots its `standby_slots`.
910    ///   2. Batches the unique primary npubs across those slots and
911    ///      asks Nostr for the latest heartbeat per primary.
912    ///   3. For each slot whose primary's last heartbeat is older
913    ///      than `STANDBY_HEARTBEAT_SILENCE_SECS`, fires
914    ///      `schedule_standby_promotion(slot)`.
915    ///
916    /// Two layers of dedup keep at most one promotion per workload:
917    ///
918    ///  - **Intra-process** (watchdog vs revocation listener): both
919    ///    callers funnel through `schedule_standby_promotion`, which
920    ///    `slot.remove`s atomically inside its spawned task — first
921    ///    caller wins, second gets `None` and returns.
922    ///  - **Inter-process** (peer standbys racing for the same
923    ///    workload after a primary hard-crash): the winner publishes
924    ///    a `KIND_STANDBY_PROMOTION_ANNOUNCEMENT` event immediately
925    ///    after spawning; higher-indexed peers query for it during
926    ///    their own promotion task (after the per-index backoff)
927    ///    and drop their slot without spawning if they see one.
928    async fn standby_watchdog_loop(&self) -> Result<()> {
929        let interval = tokio::time::Duration::from_secs(STANDBY_WATCHDOG_INTERVAL_SECS);
930        info!(
931            "Standby watchdog loop starting (cadence: {}s, silence threshold: {}s)",
932            STANDBY_WATCHDOG_INTERVAL_SECS, STANDBY_HEARTBEAT_SILENCE_SECS
933        );
934
935        loop {
936            tokio::time::sleep(interval).await;
937
938            let slots: Vec<StandbySlot> = {
939                let lock = self.standby_slots.lock().await;
940                lock.values().cloned().collect()
941            };
942            if slots.is_empty() {
943                continue;
944            }
945
946            // Dedupe primaries — many slots may share one primary
947            // (single workload with N standbys), no point querying
948            // its heartbeat once per slot.
949            let primary_npubs: Vec<String> = slots
950                .iter()
951                .map(|s| s.primary_npub.clone())
952                .collect::<std::collections::HashSet<_>>()
953                .into_iter()
954                .collect();
955
956            let heartbeats = match self.nostr.get_latest_heartbeats_multi(primary_npubs).await {
957                Ok(hb) => hb,
958                Err(e) => {
959                    warn!(
960                        "standby watchdog: heartbeat batch query failed: {}; \
961                         skipping this tick (will retry next interval)",
962                        e
963                    );
964                    continue;
965                }
966            };
967
968            let now = std::time::SystemTime::now()
969                .duration_since(std::time::UNIX_EPOCH)?
970                .as_secs();
971
972            for slot in slots {
973                let last_seen = heartbeats
974                    .get(&slot.primary_npub)
975                    .map(|hb| hb.timestamp)
976                    .unwrap_or(0);
977                // When no heartbeat has been observed yet (last_seen == 0),
978                // fall back to the slot's reservation timestamp. This
979                // gives a fresh standby a full silence-window of grace
980                // before promoting — otherwise the watchdog would fire
981                // on the first tick (0-30s after spawn) and promote a
982                // healthy primary that simply hasn't published its next
983                // heartbeat yet.
984                let silence_baseline = if last_seen == 0 {
985                    slot.created_at
986                } else {
987                    last_seen
988                };
989                if !primary_is_silent(now, silence_baseline, STANDBY_HEARTBEAT_SILENCE_SECS) {
990                    continue;
991                }
992                let silence_secs = now.saturating_sub(silence_baseline);
993                warn!(
994                    "Primary {} silent for {}s on slot workload_id={} (threshold {}s); \
995                     triggering standby promotion (assumes hard crash; deduped \
996                     intra-process via slot.remove and inter-process via the \
997                     promotion-announcement event published post-spawn)",
998                    slot.primary_npub,
999                    silence_secs,
1000                    slot.workload_id,
1001                    STANDBY_HEARTBEAT_SILENCE_SECS
1002                );
1003                schedule_standby_promotion(
1004                    self.backend.clone(),
1005                    self.active_workloads.clone(),
1006                    self.state_machine.clone(),
1007                    self.standby_slots.clone(),
1008                    self.nostr.clone(),
1009                    slot,
1010                );
1011            }
1012        }
1013    }
1014}
1015
1016/// Cadence at which the standby watchdog re-queries heartbeats. 30s
1017/// matches `cleanup_loop` so we don't add a new periodic on the box.
1018const STANDBY_WATCHDOG_INTERVAL_SECS: u64 = 30;
1019
1020/// Heartbeat-silence threshold: how long without a primary heartbeat
1021/// before we treat the primary as crashed. 180s = 3× the default
1022/// 60s heartbeat cadence — gives the primary two missed beats of
1023/// grace (transient relay flake, brief network blip) before
1024/// promotion fires. Configurable in `ProviderConfig` is a
1025/// follow-up; today it's a constant so the silence window is
1026/// uniform across all standbys.
1027const STANDBY_HEARTBEAT_SILENCE_SECS: u64 = 180;
1028
1029/// Pure-function silence check, factored out for unit testing. The
1030/// watchdog calls this for each (now, baseline, threshold) tuple,
1031/// where `baseline` is either the timestamp of the most-recently
1032/// observed primary heartbeat or, when none has been observed yet,
1033/// the slot's reservation timestamp. The caller is responsible for
1034/// picking the right baseline — this function only does the
1035/// arithmetic.
1036///
1037/// Returns `true` iff `now - baseline >= threshold`, meaning the
1038/// primary has been silent (relative to the chosen baseline) for at
1039/// least the threshold window.
1040///
1041/// `baseline == 0` is treated as "unknown" and returns `false` — the
1042/// caller forgot to provide a baseline, and we'd rather not promote
1043/// than promote spuriously.
1044fn primary_is_silent(now: u64, baseline: u64, threshold: u64) -> bool {
1045    if baseline == 0 {
1046        return false;
1047    }
1048    now.saturating_sub(baseline) >= threshold
1049}
1050
1051// Clone impl removed as ComputeBackend is Arc'd
1052
1053/// Handle a spawn request.
1054///
1055/// Redeems the provided Cashu token at the mint via the supplied
1056/// `MintRedeemer` (Unit 1 — see docs/plans/...). On any redemption
1057/// failure (invalid token, non-whitelisted mint, already-spent,
1058/// pending, network) we reply with a structured error and DO NOT call
1059/// the backend — no container is created without a successful swap.
1060async fn handle_spawn_request(
1061    backend: &dyn ComputeBackend,
1062    config: &ProviderConfig,
1063    nostr: &NostrRelaySubscriber,
1064    redeemer: &dyn MintRedeemer,
1065    workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
1066    stats: &Arc<Mutex<ProviderStats>>,
1067    state_machine: &Arc<Mutex<WorkloadStateMachine>>,
1068    standby_slots: &Arc<Mutex<HashMap<String, StandbySlot>>>,
1069    requester_pubkey: &str,
1070    message_type: &str,
1071    request: EncryptedSpawnPodRequest,
1072) -> Result<()> {
1073    info!(
1074        "Processing spawn request from {} (tier: {:?})",
1075        requester_pubkey, request.pod_spec_id
1076    );
1077
1078    // Self-role detection for warm-standby spawns. The consumer
1079    // sends the SAME shape of request to N+1 providers — the
1080    // primary and each standby. Each provider compares its own
1081    // npub against the request's primary_npub / standby_providers
1082    // to figure out which path to take. Single-replication spawns
1083    // (the common case) skip this entirely.
1084    let role = compute_warm_standby_role(&nostr.get_service_public_key(), &request);
1085    if matches!(role, WarmStandbyRole::NotAddressed) {
1086        if request
1087            .replication
1088            .as_ref()
1089            .map(|r| {
1090                matches!(
1091                    r,
1092                    crate::durable_workload::ReplicationMode::WarmStandby { .. }
1093                )
1094            })
1095            .unwrap_or(false)
1096        {
1097            // The consumer set replication=WarmStandby but neither
1098            // designated us as primary nor included us in the
1099            // standby list. Refuse to spend the token — they sent
1100            // to the wrong provider.
1101            let err_msg =
1102                "warm-standby spawn arrived at a provider not designated as primary or standby";
1103            warn!("{}", err_msg);
1104            nostr
1105                .send_error_response(
1106                    requester_pubkey,
1107                    "not_addressed",
1108                    err_msg,
1109                    None,
1110                    message_type,
1111                )
1112                .await?;
1113            return Ok(());
1114        }
1115    }
1116
1117    // 1. Redeem Cashu token at the mint (Unit 1).
1118    let payment_msats = match validate_and_redeem(
1119        redeemer,
1120        &config.whitelisted_mints,
1121        &request.cashu_token,
1122    )
1123    .await
1124    {
1125        Ok(v) => v,
1126        Err(e) => {
1127            let (error_type, err_msg) = redeem_error_to_response(&e);
1128            error!("Cashu redemption failed: {}", err_msg);
1129            nostr
1130                .send_error_response(requester_pubkey, error_type, &err_msg, None, message_type)
1131                .await?;
1132            return Ok(());
1133        }
1134    };
1135
1136    // 2. Find matching spec/tier
1137    let spec = match config
1138        .specs
1139        .iter()
1140        .find(|s| Some(s.id.clone()) == request.pod_spec_id)
1141    {
1142        Some(s) => s,
1143        None => {
1144            // Default to first spec if none specified or not found
1145            if let Some(s) = config.specs.first() {
1146                s
1147            } else {
1148                let err_msg = "No pod specifications available on this provider";
1149                error!("{}", err_msg);
1150                nostr
1151                    .send_error_response(requester_pubkey, "no_specs", err_msg, None, message_type)
1152                    .await?;
1153                return Ok(());
1154            }
1155        }
1156    };
1157
1158    // 3. Calculate Duration
1159    let duration_secs = payment_msats / spec.rate_msats_per_sec;
1160    if duration_secs < config.minimum_duration_seconds {
1161        let err_msg = format!(
1162            "Insufficient payment for minimum duration. Required: {} msats for {}s",
1163            config.minimum_duration_seconds * spec.rate_msats_per_sec,
1164            config.minimum_duration_seconds
1165        );
1166        warn!("{}", err_msg);
1167        nostr
1168            .send_error_response(
1169                requester_pubkey,
1170                "insufficient_payment",
1171                &err_msg,
1172                None,
1173                message_type,
1174            )
1175            .await?;
1176        return Ok(());
1177    }
1178
1179    info!(
1180        "Validated payment: {} msats for {}s on tier {}",
1181        payment_msats, duration_secs, spec.name
1182    );
1183
1184    // 4. Find available ID
1185    let id = match backend
1186        .find_available_id(config.vmid_range_start, config.vmid_range_end)
1187        .await
1188    {
1189        Ok(id) => id,
1190        Err(e) => {
1191            let err_msg = format!("Failed to find available ID: {}", e);
1192            error!("{}", err_msg);
1193            nostr
1194                .send_error_response(
1195                    requester_pubkey,
1196                    "provisioning_error",
1197                    &err_msg,
1198                    None,
1199                    message_type,
1200                )
1201                .await?;
1202            return Ok(());
1203        }
1204    };
1205
1206    // 5. Generate credentials
1207    let password = generate_password();
1208
1209    // Calculate host port for SSH forwarding (LXD/Proxmox path).
1210    let host_port = match config.ssh_port_start {
1211        Some(start) => start + (id - config.vmid_range_start) as u16,
1212        None => 30000 + (id % 10000) as u16,
1213    };
1214
1215    // 6. Resolve template (if requested) — image + ports + env come
1216    //    from the provider's OWN local registry, not consumer bytes.
1217    //    Unknown slugs are rejected so a consumer can't probe for
1218    //    accepted templates by sending arbitrary strings.
1219    let template = if let Some(slug) = request.template_slug.as_deref() {
1220        match TemplateName::from_slug(slug) {
1221            Some(name) => Some(TemplateDefinition::lookup(name)),
1222            None => {
1223                let err_msg = format!(
1224                    "Unknown template `{}` — provider does not advertise it",
1225                    slug
1226                );
1227                warn!("{}", err_msg);
1228                nostr
1229                    .send_error_response(
1230                        requester_pubkey,
1231                        "unknown_template",
1232                        &err_msg,
1233                        None,
1234                        message_type,
1235                    )
1236                    .await?;
1237                return Ok(());
1238            }
1239        }
1240    } else {
1241        None
1242    };
1243
1244    // Image: template wins over consumer-supplied (sandbox).
1245    let image = template
1246        .as_ref()
1247        .map(|t| t.image.to_string())
1248        .unwrap_or_else(|| request.pod_image.clone());
1249
1250    // Port mappings: each template port published on a host port
1251    // derived from `host_port` so multiple workloads on the same
1252    // provider don't collide. We allocate `host_port + i + 1` for
1253    // template port i (host_port itself stays for SSH where backends
1254    // care about it).
1255    let template_ports: Vec<PortMapping> = template
1256        .as_ref()
1257        .map(|t| {
1258            t.ports
1259                .iter()
1260                .enumerate()
1261                .map(|(i, p)| PortMapping {
1262                    host_port: host_port.saturating_add(1 + i as u16),
1263                    container_port: p.container_port,
1264                    protocol: "tcp",
1265                })
1266                .collect()
1267        })
1268        .unwrap_or_default();
1269
1270    let mut template_env: HashMap<String, String> = template
1271        .as_ref()
1272        .map(|t| {
1273            t.env
1274                .iter()
1275                .map(|(k, v)| (k.to_string(), v.to_string()))
1276                .collect()
1277        })
1278        .unwrap_or_default();
1279
1280    // For templates that bake in the paygress-exec HTTP server
1281    // (currently `agent-sandbox`), inject the same credentials the
1282    // consumer will see in AccessDetails — `root` + the
1283    // provider-generated `password`. The instructions block already
1284    // tells the consumer to use "root + this password", so the exec
1285    // server reusing those creds means there's exactly one secret
1286    // to manage per spawn.
1287    //
1288    // The template defaults EXEC_USER/EXEC_PASS to empty strings;
1289    // the server returns 503 until they're non-empty, so this
1290    // overlay is what unlocks /exec.
1291    if let Some(t) = template.as_ref() {
1292        if t.env.contains_key("EXEC_USER") {
1293            template_env.insert("EXEC_USER".to_string(), "root".to_string());
1294        }
1295        if t.env.contains_key("EXEC_PASS") {
1296            template_env.insert("EXEC_PASS".to_string(), password.clone());
1297        }
1298    }
1299
1300    let extra_runtime_args: Vec<String> = template
1301        .as_ref()
1302        .map(|t| t.extra_docker_args.iter().map(|s| s.to_string()).collect())
1303        .unwrap_or_default();
1304
1305    let data_path: Option<String> = template
1306        .as_ref()
1307        .and_then(|t| t.data_path.map(|p| p.to_string()));
1308
1309    // Volume encryption (Phase 2): decode the consumer-supplied key
1310    // from the spawn request. Silently None for stateless workloads
1311    // (data_path is None) since there's nothing to encrypt — saves
1312    // surfacing a confusing "key supplied but ignored" warning to a
1313    // consumer who set --encrypt-volume on a stateless template.
1314    let volume_encryption_key = match (&data_path, request.volume_encryption.as_ref()) {
1315        (Some(_), Some(ve)) => match ve.decoded_key() {
1316            Ok(key) => {
1317                info!(
1318                    "Spawn request includes volume_encryption (algorithm={}, version={}); will create LUKS-encrypted data volume",
1319                    ve.algorithm, ve.version
1320                );
1321                Some(key)
1322            }
1323            Err(e) => {
1324                error!(
1325                    "Rejecting spawn: malformed volume_encryption.key_b64: {}",
1326                    e
1327                );
1328                let err_payload = ErrorResponseContent {
1329                    error_type: "invalid_volume_encryption".to_string(),
1330                    message: format!("volume_encryption rejected: {}", e),
1331                    details: None,
1332                };
1333                let _ = nostr
1334                    .send_error_response_private_message(
1335                        requester_pubkey,
1336                        err_payload,
1337                        message_type,
1338                    )
1339                    .await;
1340                return Ok(());
1341            }
1342        },
1343        (None, Some(_)) => {
1344            warn!(
1345                "Spawn request set volume_encryption but template has no data_path; encryption is a no-op for stateless workloads"
1346            );
1347            None
1348        }
1349        _ => None,
1350    };
1351
1352    // 7. Create Container
1353    let container_config = ContainerConfig {
1354        id,
1355        name: format!("paygress-{}", id),
1356        image,
1357        cpu_cores: (spec.cpu_millicores / 1000).max(1) as u32,
1358        memory_mb: spec.memory_mb as u32,
1359        storage_gb: 10, // Default 10GB
1360        password: password.clone(),
1361        ssh_key: None,
1362        host_port: Some(host_port),
1363        template_ports,
1364        template_env,
1365        extra_runtime_args,
1366        data_path,
1367        volume_encryption_key,
1368    };
1369
1370    // ---- Standby branch ----
1371    //
1372    // If self is a standby for this workload (warm-standby spawn,
1373    // self.npub in standby_providers), DON'T create the container
1374    // yet. Reserve the slot, return a standby-confirmation
1375    // AccessDetails to the consumer, and wait for a
1376    // `LeaseRevocation` from the primary to trigger promotion.
1377    //
1378    // The token has already been redeemed at step 1, so the
1379    // consumer paid for the reservation — providers earn revenue
1380    // for offering standby capacity even if the primary never
1381    // fails over.
1382    if let WarmStandbyRole::Standby { index, count } = role {
1383        let workload_id = match request.workload_id.as_deref() {
1384            Some(id) if !id.is_empty() => id.to_string(),
1385            _ => {
1386                let err_msg = "warm-standby spawn missing workload_id (consumer-assigned UUID required to coordinate primary + standbys)";
1387                warn!("{}", err_msg);
1388                nostr
1389                    .send_error_response(
1390                        requester_pubkey,
1391                        "missing_workload_id",
1392                        err_msg,
1393                        None,
1394                        message_type,
1395                    )
1396                    .await?;
1397                return Ok(());
1398            }
1399        };
1400        let primary_npub = request.primary_npub.clone().unwrap_or_default();
1401        let now = std::time::SystemTime::now()
1402            .duration_since(std::time::UNIX_EPOCH)?
1403            .as_secs();
1404        // Compute peer-standby npubs: the full standby set minus
1405        // self. Used at promotion-time to detect that a lower-indexed
1406        // peer has already promoted (their fresh heartbeat under
1407        // their own npub is the dedup signal). For non-WarmStandby
1408        // replication this branch is unreachable (role would be
1409        // Primary), so we can safely default to empty.
1410        let self_npub = nostr.get_service_public_key();
1411        let peer_standby_npubs: Vec<String> = match request.replication.as_ref() {
1412            Some(crate::durable_workload::ReplicationMode::WarmStandby { standby_providers }) => {
1413                standby_providers
1414                    .iter()
1415                    .filter(|p| !crate::nostr::npubs_equal(p, &self_npub))
1416                    .cloned()
1417                    .collect()
1418            }
1419            _ => Vec::new(),
1420        };
1421        let slot = StandbySlot {
1422            workload_id: workload_id.clone(),
1423            primary_npub,
1424            standby_index: index,
1425            standby_count: count,
1426            container_config: container_config.clone(),
1427            spec_id: spec.id.clone(),
1428            expires_at: now + duration_secs,
1429            owner_npub: requester_pubkey.to_string(),
1430            created_at: now,
1431            peer_standby_npubs,
1432        };
1433        info!(
1434            "Reserved standby slot for workload_id={} (index {}/{}, expires at {})",
1435            workload_id, index, count, slot.expires_at
1436        );
1437        standby_slots.lock().await.insert(workload_id.clone(), slot);
1438
1439        // Send a structured "standby reserved" AccessDetails so the
1440        // consumer-side coordinator can confirm the reservation
1441        // landed. Reuse AccessDetailsContent's shape with a
1442        // distinguishing instructions block; adding a new
1443        // dedicated content type would be a wire-schema bump for
1444        // a single edge case.
1445        let expires_dt =
1446            chrono::DateTime::from_timestamp((now + duration_secs) as i64, 0).unwrap_or_default();
1447        let details = AccessDetailsContent {
1448            pod_npub: format!("standby-slot-{}", workload_id),
1449            node_port: 0, // No live container yet; 0 signals "reserved, not running"
1450            expires_at: expires_dt.to_rfc3339(),
1451            cpu_millicores: spec.cpu_millicores,
1452            memory_mb: spec.memory_mb,
1453            pod_spec_name: spec.name.clone(),
1454            pod_spec_description: spec.description.clone(),
1455            instructions: vec![
1456                format!(
1457                    "🛏️  Standby slot reserved (index {}/{} for workload {}).",
1458                    index, count, workload_id
1459                ),
1460                format!(
1461                    "Will promote on LeaseRevocation event from primary {}.",
1462                    request.primary_npub.as_deref().unwrap_or("(unset)")
1463                ),
1464                format!(
1465                    "Expected promotion delay: {} seconds (index * 30s backoff).",
1466                    index * 30
1467                ),
1468            ],
1469            host_address: config.public_ip.clone(),
1470            template_ports: Vec::new(),
1471        };
1472        nostr
1473            .send_access_details_private_message(requester_pubkey, details, message_type)
1474            .await?;
1475        return Ok(());
1476    }
1477
1478    debug!("Calling backend.create_container for workload {}", id);
1479    if let Err(e) = backend.create_container(&container_config).await {
1480        let err_msg = format!("Backend failed to create workload: {}", e);
1481        error!("{}", err_msg);
1482        nostr
1483            .send_error_response(
1484                requester_pubkey,
1485                "backend_error",
1486                &err_msg,
1487                None,
1488                message_type,
1489            )
1490            .await?;
1491        return Ok(());
1492    }
1493    debug!("Successfully created container {}", id);
1494
1495    let now = std::time::SystemTime::now()
1496        .duration_since(std::time::UNIX_EPOCH)?
1497        .as_secs();
1498
1499    // 7. Track Workload.
1500    //
1501    // Replication mode flows from the consumer's spawn request. Old
1502    // clients (no `replication` field) default to None — identical
1503    // behavior to before this PR. New clients can opt into
1504    // `WarmStandby { standby_providers }` by sending the same spawn
1505    // request to every provider in the set; the orchestrator's
1506    // `PublishLeaseRevocation` event from #34 then has a real
1507    // `standby_providers` list to address. Standby-side promotion
1508    // (subscribing + acting on incoming revocations) lands in a
1509    // follow-up.
1510    let replication = request
1511        .replication
1512        .clone()
1513        .unwrap_or_else(crate::durable_workload::ReplicationMode::default);
1514    let workload = WorkloadInfo {
1515        vmid: id,
1516        workload_type: "lxc".to_string(), // Default for Proxmox/LXD
1517        spec_id: spec.id.clone(),
1518        created_at: now,
1519        expires_at: now + duration_secs,
1520        owner_npub: requester_pubkey.to_string(),
1521        replication,
1522        restart_policy: crate::durable_workload::RestartPolicy::default(),
1523        state_uri: None,
1524        // Carried through from the spawn request so the orchestrator
1525        // can publish revocations addressed to the consumer's UUID
1526        // (which standbys key their slot table by).
1527        consumer_workload_id: request.workload_id.clone().filter(|s| !s.is_empty()),
1528    };
1529
1530    workloads.lock().await.insert(id, workload.clone());
1531
1532    // Register the workload with the state machine (Unit 5 wiring).
1533    // Starts in `Provisioning`; the orchestrator promotes it to
1534    // `Live` on the first observation tick that sees quorum.
1535    state_machine.lock().await.track(DurableWorkload {
1536        workload_id: id,
1537        provider_npub: nostr.get_service_public_key(),
1538        state: WorkloadState::Provisioning { since: now },
1539        replication: workload.replication.clone(),
1540        restart_policy: workload.restart_policy,
1541        state_uri: workload.state_uri.clone(),
1542        created_at: now,
1543        expires_at: workload.expires_at,
1544    });
1545
1546    // Update stats
1547    {
1548        let mut s = stats.lock().await;
1549        s.total_jobs_completed += 1;
1550    }
1551
1552    // 8. Get Access Details
1553    // Use configured public IP/host
1554    let host = &config.public_ip;
1555
1556    // Build per-template ports with their template labels so the
1557    // consumer doesn't have to remember the host_port + 1 + i rule.
1558    // We zip back to the source TemplateDefinition by matching on
1559    // container_port (each template's ports are unique by
1560    // container_port today).
1561    let template_access_ports: Vec<crate::nostr::TemplateAccessPort> = container_config
1562        .template_ports
1563        .iter()
1564        .map(|p| {
1565            let label = template
1566                .as_ref()
1567                .and_then(|t| {
1568                    t.ports
1569                        .iter()
1570                        .find(|tp| tp.container_port == p.container_port)
1571                })
1572                .map(|tp| tp.label.to_string())
1573                .unwrap_or_else(|| format!("port-{}", p.container_port));
1574            crate::nostr::TemplateAccessPort {
1575                host_port: p.host_port,
1576                container_port: p.container_port,
1577                protocol: p.protocol.to_string(),
1578                label,
1579            }
1580        })
1581        .collect();
1582
1583    // Send access details
1584    let expires_dt =
1585        chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
1586
1587    // Instructions: keep the SSH lines for legacy/manual access,
1588    // append per-template-port lines so humans see them too.
1589    let mut instructions = vec![
1590        format!("🚀 Workload provisioned successfully!"),
1591        format!("👤 Username: root"),
1592        format!("🔑 Password: {}", password),
1593        format!("⌛ Expires: {}", expires_dt.format("%Y-%m-%d %H:%M:%S UTC")),
1594        format!("Access: You can connect to the container using SSH."),
1595        format!("  ssh -p {} root@{}", host_port, host),
1596    ];
1597    if !template_access_ports.is_empty() {
1598        instructions.push(format!("Workload ports:"));
1599        for p in &template_access_ports {
1600            instructions.push(format!(
1601                "  {} ({}): {}://{}:{}",
1602                p.label, p.protocol, p.protocol, host, p.host_port
1603            ));
1604        }
1605    }
1606
1607    let details = AccessDetailsContent {
1608        pod_npub: format!("container-{}", id),
1609        node_port: host_port,
1610        expires_at: expires_dt.to_rfc3339(),
1611        cpu_millicores: spec.cpu_millicores,
1612        memory_mb: spec.memory_mb,
1613        pod_spec_name: spec.name.clone(),
1614        pod_spec_description: spec.description.clone(),
1615        instructions,
1616        host_address: host.clone(),
1617        template_ports: template_access_ports,
1618    };
1619
1620    debug!("Sending access details to {}", requester_pubkey);
1621    nostr
1622        .send_access_details_private_message(requester_pubkey, details, message_type)
1623        .await?;
1624
1625    debug!("Access details sent successfully");
1626
1627    info!("Workload {} provisioned for {} seconds", id, duration_secs);
1628    Ok(())
1629}
1630
1631/// Handle a TopUp request (Unit 2 of the 12-month plan).
1632///
1633/// Looks up the workload by its `pod_npub` (which the spawn handler
1634/// returned as `container-<vmid>`), verifies the requester owns it,
1635/// redeems the supplied Cashu token at the mint (Unit 1), and
1636/// extends `expires_at` by `redeemed_msats / spec.rate_msats_per_sec`
1637/// under the existing workloads mutex.
1638///
1639/// Mutex discipline: redemption (a network call to the mint) happens
1640/// BEFORE we re-acquire the workloads lock, so the lock is never held
1641/// across an external request.
1642async fn handle_topup_request(
1643    config: &ProviderConfig,
1644    nostr: &NostrRelaySubscriber,
1645    redeemer: &dyn MintRedeemer,
1646    workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
1647    requester_pubkey: &str,
1648    message_type: &str,
1649    request: EncryptedTopUpPodRequest,
1650) -> Result<()> {
1651    info!(
1652        "Processing topup request from {} for {}",
1653        requester_pubkey, request.pod_npub
1654    );
1655
1656    let vmid = match parse_pod_npub(&request.pod_npub) {
1657        Some(v) => v,
1658        None => {
1659            let err_msg = format!(
1660                "Could not parse pod identifier `{}`; expected `container-<id>` or numeric id",
1661                request.pod_npub
1662            );
1663            warn!("{}", err_msg);
1664            nostr
1665                .send_error_response(
1666                    requester_pubkey,
1667                    "invalid_pod_id",
1668                    &err_msg,
1669                    None,
1670                    message_type,
1671                )
1672                .await?;
1673            return Ok(());
1674        }
1675    };
1676
1677    // 1. Snapshot the workload + spec under a brief read-only lock so
1678    //    we know how to bill before we redeem.
1679    let now = std::time::SystemTime::now()
1680        .duration_since(std::time::UNIX_EPOCH)?
1681        .as_secs();
1682
1683    let (spec_id, current_expires_at) = {
1684        let lock = workloads.lock().await;
1685        match lock.get(&vmid) {
1686            Some(w) if w.owner_npub == requester_pubkey => (w.spec_id.clone(), w.expires_at),
1687            Some(_) => {
1688                drop(lock);
1689                let err_msg = "Pod not owned by requester";
1690                warn!("{}: vmid={}", err_msg, vmid);
1691                nostr
1692                    .send_error_response(requester_pubkey, "not_owner", err_msg, None, message_type)
1693                    .await?;
1694                return Ok(());
1695            }
1696            None => {
1697                drop(lock);
1698                let err_msg = format!("Pod {} not found", request.pod_npub);
1699                warn!("{}", err_msg);
1700                nostr
1701                    .send_error_response(
1702                        requester_pubkey,
1703                        "not_found",
1704                        &err_msg,
1705                        None,
1706                        message_type,
1707                    )
1708                    .await?;
1709                return Ok(());
1710            }
1711        }
1712    };
1713
1714    if current_expires_at <= now {
1715        let err_msg = format!(
1716            "Pod {} lease has already expired; spawn a new pod instead",
1717            request.pod_npub
1718        );
1719        warn!("{}", err_msg);
1720        nostr
1721            .send_error_response(
1722                requester_pubkey,
1723                "lease_expired",
1724                &err_msg,
1725                None,
1726                message_type,
1727            )
1728            .await?;
1729        return Ok(());
1730    }
1731
1732    let spec = match config.specs.iter().find(|s| s.id == spec_id) {
1733        Some(s) => s.clone(),
1734        None => {
1735            // Spec referenced by the workload no longer exists in
1736            // config — provider misconfiguration. Refuse the topup
1737            // rather than silently mis-billing.
1738            let err_msg = format!(
1739                "Pod {} references unknown spec `{}`; provider misconfiguration",
1740                request.pod_npub, spec_id
1741            );
1742            error!("{}", err_msg);
1743            nostr
1744                .send_error_response(
1745                    requester_pubkey,
1746                    "spec_unavailable",
1747                    &err_msg,
1748                    None,
1749                    message_type,
1750                )
1751                .await?;
1752            return Ok(());
1753        }
1754    };
1755
1756    // 2. Redeem the topup token (no workloads lock held).
1757    let payment_msats = match validate_and_redeem(
1758        redeemer,
1759        &config.whitelisted_mints,
1760        &request.cashu_token,
1761    )
1762    .await
1763    {
1764        Ok(v) => v,
1765        Err(e) => {
1766            let (error_type, err_msg) = redeem_error_to_response(&e);
1767            error!("Topup redemption failed: {}", err_msg);
1768            nostr
1769                .send_error_response(requester_pubkey, error_type, &err_msg, None, message_type)
1770                .await?;
1771            return Ok(());
1772        }
1773    };
1774
1775    let extension_secs = payment_msats / spec.rate_msats_per_sec;
1776    if extension_secs == 0 {
1777        let err_msg = format!(
1778            "Insufficient topup: {} msats buys 0 seconds at {} msats/sec",
1779            payment_msats, spec.rate_msats_per_sec
1780        );
1781        warn!("{}", err_msg);
1782        nostr
1783            .send_error_response(
1784                requester_pubkey,
1785                "insufficient_payment",
1786                &err_msg,
1787                None,
1788                message_type,
1789            )
1790            .await?;
1791        return Ok(());
1792    }
1793
1794    // 3. Apply the extension under the workloads lock. We re-check
1795    //    ownership and existence after re-locking to defend against
1796    //    cleanup having run between our snapshot and now.
1797    let new_expires_at = {
1798        let mut lock = workloads.lock().await;
1799        match lock.get_mut(&vmid) {
1800            Some(w) if w.owner_npub == requester_pubkey => {
1801                w.expires_at = w.expires_at.saturating_add(extension_secs);
1802                w.expires_at
1803            }
1804            _ => {
1805                // Vanished or ownership changed between snapshots.
1806                // The token is already spent at the mint — this is a
1807                // race the consumer should retry by spawning a new
1808                // pod. We surface a distinct error so the CLI can
1809                // explain it.
1810                drop(lock);
1811                let err_msg =
1812                    "Pod was cleaned up before topup could be applied; token has been spent";
1813                error!("{}: vmid={}", err_msg, vmid);
1814                nostr
1815                    .send_error_response(requester_pubkey, "race_lost", err_msg, None, message_type)
1816                    .await?;
1817                return Ok(());
1818            }
1819        }
1820    };
1821
1822    let new_expires_dt =
1823        chrono::DateTime::from_timestamp(new_expires_at as i64, 0).unwrap_or_default();
1824    let response = TopUpResponseContent {
1825        success: true,
1826        pod_npub: request.pod_npub.clone(),
1827        extended_duration_seconds: extension_secs,
1828        new_expires_at: new_expires_dt.to_rfc3339(),
1829        message: format!(
1830            "Lease extended by {}s ({} msats @ {} msats/sec)",
1831            extension_secs, payment_msats, spec.rate_msats_per_sec
1832        ),
1833    };
1834
1835    nostr
1836        .send_topup_response_private_message(requester_pubkey, response, message_type)
1837        .await?;
1838
1839    info!(
1840        "Topup applied to {}: +{}s (now expires at {})",
1841        request.pod_npub, extension_secs, new_expires_at
1842    );
1843    Ok(())
1844}
1845
1846/// Generate a 16-character alphanumeric SSH password. Lives here
1847/// (rather than `sidecar_service`) so the Nostr-DM canonical path
1848/// doesn't depend on the legacy K8s pipeline that Unit 7 gates
1849/// behind the `kubernetes` Cargo feature.
1850fn generate_password() -> String {
1851    use rand::Rng;
1852    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1853    let mut rng = rand::thread_rng();
1854    (0..16)
1855        .map(|_| {
1856            let idx = rng.gen_range(0..CHARSET.len());
1857            CHARSET[idx] as char
1858        })
1859        .collect()
1860}
1861
1862/// Parse a pod identifier emitted by the spawn handler back into the
1863/// internal vmid. Accepts both `container-<vmid>` (the format
1864/// AccessDetailsContent returns to the consumer) and a bare numeric
1865/// id (for callers that already know it).
1866pub fn parse_pod_npub(pod_npub: &str) -> Option<u32> {
1867    if let Some(rest) = pod_npub.strip_prefix("container-") {
1868        rest.parse().ok()
1869    } else {
1870        pod_npub.parse().ok()
1871    }
1872}
1873
1874/// Translate a `RedeemError` into the `(error_type, message)` shape the
1875/// Nostr error-response uses. The error-type strings are stable so
1876/// consumers can reason about them programmatically (retry on
1877/// `network`, give up on `already_spent`, etc.).
1878fn redeem_error_to_response(err: &RedeemError) -> (&'static str, String) {
1879    match err {
1880        RedeemError::InvalidToken(msg) => {
1881            ("invalid_token", format!("Invalid Cashu token: {}", msg))
1882        }
1883        RedeemError::NonWhitelistedMint { mint_url } => (
1884            "non_whitelisted_mint",
1885            format!("Mint {} is not accepted by this provider", mint_url),
1886        ),
1887        RedeemError::AlreadySpent => (
1888            "token_already_spent",
1889            "This Cashu token has already been spent at the mint".to_string(),
1890        ),
1891        RedeemError::Pending => (
1892            "token_pending",
1893            "Token is pending at the mint; retry shortly".to_string(),
1894        ),
1895        RedeemError::Network(msg) => (
1896            "mint_network_error",
1897            format!("Could not reach mint: {}", msg),
1898        ),
1899        RedeemError::UnsupportedUnit(unit) => (
1900            "unsupported_unit",
1901            format!("Token unit {} is not supported", unit),
1902        ),
1903        RedeemError::MintError(msg) => ("mint_error", format!("Mint rejected redemption: {}", msg)),
1904    }
1905}
1906
1907/// Handle a status request
1908async fn handle_status_request(
1909    config: &ProviderConfig,
1910    nostr: &NostrRelaySubscriber,
1911    workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
1912    requester_pubkey: &str,
1913    message_type: &str,
1914    request: StatusRequestContent,
1915) -> Result<()> {
1916    info!(
1917        "Processing status request for pod {} from {}",
1918        request.pod_id, requester_pubkey
1919    );
1920
1921    // 1. Try to find the workload by ID (which could be vmid)
1922    let vmid = request.pod_id.parse::<u32>().ok();
1923
1924    let workload = {
1925        let lock = workloads.lock().await;
1926        if let Some(vmid) = vmid {
1927            lock.get(&vmid).cloned()
1928        } else {
1929            // If not a number, maybe it's a pod_npub? (not yet implemented in tracking, but we search by owner for now)
1930            lock.values()
1931                .find(|w| w.owner_npub == request.pod_id || w.owner_npub == requester_pubkey)
1932                .cloned()
1933        }
1934    };
1935
1936    let workload = match workload {
1937        Some(w) => w,
1938        None => {
1939            let err_msg = format!(
1940                "Workload {} not found or you don't have access",
1941                request.pod_id
1942            );
1943            warn!("{}", err_msg);
1944            nostr
1945                .send_error_response(requester_pubkey, "not_found", &err_msg, None, message_type)
1946                .await?;
1947            return Ok(());
1948        }
1949    };
1950
1951    // 2. Prepare response
1952    let now = std::time::SystemTime::now()
1953        .duration_since(std::time::UNIX_EPOCH)?
1954        .as_secs();
1955
1956    let time_remaining = workload.expires_at.saturating_sub(now);
1957    let status = if time_remaining == 0 {
1958        "Expired"
1959    } else {
1960        "Running"
1961    };
1962
1963    let expires_dt =
1964        chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
1965
1966    // Look up spec for actual resource values
1967    let spec = config.specs.iter().find(|s| s.id == workload.spec_id);
1968    let cpu = spec.map(|s| s.cpu_millicores).unwrap_or(1000);
1969    let mem = spec.map(|s| s.memory_mb).unwrap_or(1024);
1970    let host_port = match config.ssh_port_start {
1971        Some(start) => start + (workload.vmid - config.vmid_range_start) as u16,
1972        None => (30000 + (workload.vmid % 10000)) as u16,
1973    };
1974
1975    let response = StatusResponseContent {
1976        pod_id: workload.vmid.to_string(),
1977        status: status.to_string(),
1978        expires_at: expires_dt.to_rfc3339(),
1979        time_remaining_seconds: time_remaining,
1980        cpu_millicores: cpu,
1981        memory_mb: mem,
1982        ssh_host: config.public_ip.clone(),
1983        ssh_port: host_port,
1984        ssh_username: "root".to_string(),
1985    };
1986
1987    nostr
1988        .send_status_response(requester_pubkey, response, message_type)
1989        .await?;
1990
1991    info!("Status response sent for workload {}", workload.vmid);
1992    Ok(())
1993}
1994
1995/// Load provider config from file
1996pub fn load_config(path: &str) -> Result<ProviderConfig> {
1997    let content =
1998        std::fs::read_to_string(path).context(format!("Failed to read config file: {}", path))?;
1999
2000    serde_json::from_str(&content).context("Failed to parse provider config")
2001}
2002
2003/// Save provider config to file
2004pub fn save_config(path: &str, config: &ProviderConfig) -> Result<()> {
2005    let content = serde_json::to_string_pretty(config)?;
2006    std::fs::write(path, content).context(format!("Failed to write config file: {}", path))?;
2007    Ok(())
2008}
2009
2010/// Per-standby ordered backoff window. Standby at index `i` waits
2011/// `i * STANDBY_PROMOTION_DELAY_SECS` after observing a revocation
2012/// before spawning the container locally. Single-writer guarantee
2013/// is best-effort: if standby 0 promotes within ~30s, standby 1
2014/// will NOT see a fresh heartbeat by the time its window opens
2015/// only if heartbeat cadence is > 30s. We accept a brief two-Live
2016/// window as a v1 trade-off for the workloads where warm-standby
2017/// makes sense (relays — idempotent; databases — needs deeper
2018/// coordination than v1 ships).
2019const STANDBY_PROMOTION_DELAY_SECS: u64 = 30;
2020
2021/// Self-role detection for a warm-standby spawn. Reads the request's
2022/// `replication.standby_providers` and `primary_npub`, compares to
2023/// `self_npub`, returns the role this provider should take.
2024///
2025/// Returns `WarmStandbyRole::Primary` for non-WarmStandby requests
2026/// too — which is correct: in the single-replication path the
2027/// "primary" is just "the one provider running this workload."
2028fn compute_warm_standby_role(
2029    self_npub: &str,
2030    request: &EncryptedSpawnPodRequest,
2031) -> WarmStandbyRole {
2032    use crate::durable_workload::ReplicationMode;
2033    match request.replication.as_ref() {
2034        Some(ReplicationMode::WarmStandby { standby_providers }) => {
2035            let primary = request.primary_npub.as_deref().unwrap_or("");
2036            warm_standby_role(self_npub, primary, standby_providers)
2037        }
2038        // No-replication / Checkpointed → there's only one provider
2039        // running this; treat it as primary so the existing flow
2040        // is unchanged.
2041        _ => WarmStandbyRole::Primary,
2042    }
2043}
2044
2045/// Spawn the standby promotion task. Runs on its own tokio task so
2046/// the request handler returns immediately. Sleeps for the
2047/// per-standby ordered backoff, then:
2048///   1. Pre-emption check — query for any
2049///      `StandbyPromotionAnnouncement` event for this workload_id
2050///      authored by a peer standby. If one exists, drop the slot
2051///      without spawning (a peer beat us to it).
2052///   2. Spawn the container locally.
2053///   3. Publish a `StandbyPromotionAnnouncement` so higher-indexed
2054///      peers' pre-emption check finds it and they back off.
2055fn schedule_standby_promotion(
2056    backend: Arc<dyn ComputeBackend>,
2057    workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
2058    state_machine: Arc<Mutex<WorkloadStateMachine>>,
2059    standby_slots: Arc<Mutex<HashMap<String, StandbySlot>>>,
2060    nostr: NostrRelaySubscriber,
2061    slot: StandbySlot,
2062) {
2063    let delay_secs = (slot.standby_index as u64).saturating_mul(STANDBY_PROMOTION_DELAY_SECS);
2064    let workload_id = slot.workload_id.clone();
2065    let standby_index = slot.standby_index;
2066    info!(
2067        "Scheduling standby promotion for workload {} after {}s backoff (standby index {})",
2068        workload_id, delay_secs, standby_index
2069    );
2070    tokio::spawn(async move {
2071        if delay_secs > 0 {
2072            tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await;
2073        }
2074
2075        // Re-check the slot is still ours to claim. If a lower-
2076        // indexed standby already promoted, it should have removed
2077        // its slot — but each standby manages only its OWN slots,
2078        // not its peers'. So this check only guards against double-
2079        // firing on the same provider (e.g. duplicate revocation
2080        // events from multiple relays).
2081        let still_present = {
2082            let mut slots = standby_slots.lock().await;
2083            slots.remove(&workload_id)
2084        };
2085        let slot = match still_present {
2086            Some(s) => s,
2087            None => {
2088                debug!(
2089                    "Standby slot for workload {} already drained; skipping promotion",
2090                    workload_id
2091                );
2092                return;
2093            }
2094        };
2095
2096        // Pre-emption check: has a peer standby already promoted
2097        // for this exact workload_id? Query the dedicated
2098        // `StandbyPromotionAnnouncement` event kind (38386), which
2099        // is published exactly once per promotion. Heartbeats can't
2100        // serve this role: every standby provider runs a periodic
2101        // heartbeat loop regardless of promotion state, so a fresh
2102        // heartbeat from a peer means "peer is online", NOT "peer
2103        // promoted". The announcement event is unambiguous.
2104        if !slot.peer_standby_npubs.is_empty() {
2105            match nostr
2106                .query_standby_promotion_announcements(&slot.workload_id, &slot.peer_standby_npubs)
2107                .await
2108            {
2109                Ok(Some(announcement)) => {
2110                    info!(
2111                        "Peer standby {} already promoted workload {} at {}; dropping slot without spawning",
2112                        announcement.new_primary_npub,
2113                        announcement.workload_id,
2114                        announcement.promoted_at
2115                    );
2116                    return;
2117                }
2118                Ok(None) => {
2119                    // No peer has announced — proceed with promotion.
2120                }
2121                Err(e) => {
2122                    warn!(
2123                        "Failed to query peer promotion announcements for workload {}: {}; proceeding with promotion (best-effort)",
2124                        slot.workload_id, e
2125                    );
2126                }
2127            }
2128        }
2129
2130        info!(
2131            "Promoting standby slot {} → primary (vmid {})",
2132            slot.workload_id, slot.container_config.id
2133        );
2134        if let Err(e) = backend.create_container(&slot.container_config).await {
2135            error!(
2136                "Standby promotion failed for workload {}: backend error: {}",
2137                slot.workload_id, e
2138            );
2139            // Re-insert the slot so a later revocation retry could
2140            // pick it up. Operator-level alerting is left to the
2141            // logs; the consumer-facing observability story for
2142            // failed promotion is a follow-up.
2143            standby_slots
2144                .lock()
2145                .await
2146                .insert(slot.workload_id.clone(), slot);
2147            return;
2148        }
2149
2150        let now = std::time::SystemTime::now()
2151            .duration_since(std::time::UNIX_EPOCH)
2152            .map(|d| d.as_secs())
2153            .unwrap_or(0);
2154        let workload = WorkloadInfo {
2155            vmid: slot.container_config.id,
2156            workload_type: "lxc".to_string(),
2157            spec_id: slot.spec_id.clone(),
2158            created_at: now,
2159            expires_at: slot.expires_at,
2160            owner_npub: slot.owner_npub.clone(),
2161            consumer_workload_id: Some(slot.workload_id.clone()),
2162            // After promotion this provider IS the primary — record
2163            // replication as None so the orchestrator doesn't try
2164            // to re-emit a revocation if quorum is lost again
2165            // (we'd need a fresh standby topology for that, which
2166            // the consumer hasn't provided post-promotion).
2167            replication: crate::durable_workload::ReplicationMode::None,
2168            restart_policy: crate::durable_workload::RestartPolicy::default(),
2169            state_uri: None,
2170        };
2171        let active_workloads_count = {
2172            let mut w = workloads.lock().await;
2173            w.insert(slot.container_config.id, workload.clone());
2174            w.len() as u32
2175        };
2176
2177        state_machine
2178            .lock()
2179            .await
2180            .track(crate::durable_workload::DurableWorkload {
2181                workload_id: slot.container_config.id,
2182                provider_npub: String::new(), // filled by orchestrator on first heartbeat tick
2183                state: crate::durable_workload::WorkloadState::Provisioning { since: now },
2184                replication: workload.replication.clone(),
2185                restart_policy: workload.restart_policy,
2186                state_uri: workload.state_uri.clone(),
2187                created_at: now,
2188                expires_at: workload.expires_at,
2189            });
2190
2191        info!(
2192            "Standby promotion complete: workload {} now running locally (vmid {})",
2193            slot.workload_id, slot.container_config.id
2194        );
2195        let _ = active_workloads_count; // currently unused now that we publish announcement instead of heartbeat
2196
2197        // Publish a `StandbyPromotionAnnouncement` IMMEDIATELY so
2198        // higher-indexed peer standbys see it on their next
2199        // pre-emption check (which fires after their per-index
2200        // backoff). Without this announcement, peers would either
2201        // produce a duplicate primary (no signal) or both drop
2202        // their slots (heartbeat-based dedup, since every peer
2203        // emits its own periodic heartbeat regardless of
2204        // promotion state).
2205        let announcement = StandbyPromotionAnnouncementContent {
2206            workload_id: slot.workload_id.clone(),
2207            new_primary_npub: nostr.get_service_public_key(),
2208            promoted_at: now,
2209            version: crate::nostr::SCHEMA_VERSION,
2210        };
2211        if let Err(e) = nostr
2212            .publish_standby_promotion_announcement(announcement)
2213            .await
2214        {
2215            warn!(
2216                "Post-promotion announcement publish failed for workload {}: {}; peer standbys will not back off and may produce a duplicate primary",
2217                slot.workload_id, e
2218            );
2219        }
2220    });
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225    use super::*;
2226    use crate::durable_workload::ReplicationMode;
2227
2228    fn req_with(
2229        replication: Option<ReplicationMode>,
2230        primary_npub: Option<&str>,
2231    ) -> EncryptedSpawnPodRequest {
2232        EncryptedSpawnPodRequest {
2233            cashu_token: "tok".to_string(),
2234            pod_spec_id: Some("basic".to_string()),
2235            pod_image: "ubuntu:22.04".to_string(),
2236            ssh_username: "u".to_string(),
2237            ssh_password: "p".to_string(),
2238            template_slug: None,
2239            replication,
2240            primary_npub: primary_npub.map(|s| s.to_string()),
2241            workload_id: Some("wid-test".to_string()),
2242            volume_encryption: None,
2243        }
2244    }
2245
2246    #[test]
2247    fn role_is_primary_for_non_warm_standby() {
2248        // No-replication request → role is Primary regardless of
2249        // self_npub (existing single-provider behavior unchanged).
2250        let r = compute_warm_standby_role("npub1self", &req_with(None, None));
2251        assert_eq!(r, WarmStandbyRole::Primary);
2252
2253        let r = compute_warm_standby_role(
2254            "npub1self",
2255            &req_with(Some(ReplicationMode::Checkpointed), None),
2256        );
2257        assert_eq!(r, WarmStandbyRole::Primary);
2258    }
2259
2260    #[test]
2261    fn role_is_primary_when_self_is_designated_primary() {
2262        let r = compute_warm_standby_role(
2263            "npub1primary",
2264            &req_with(
2265                Some(ReplicationMode::WarmStandby {
2266                    standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
2267                }),
2268                Some("npub1primary"),
2269            ),
2270        );
2271        assert_eq!(r, WarmStandbyRole::Primary);
2272    }
2273
2274    #[test]
2275    fn role_is_standby_with_correct_index_when_self_in_list() {
2276        let r = compute_warm_standby_role(
2277            "npub1c",
2278            &req_with(
2279                Some(ReplicationMode::WarmStandby {
2280                    standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
2281                }),
2282                Some("npub1primary"),
2283            ),
2284        );
2285        assert_eq!(r, WarmStandbyRole::Standby { index: 1, count: 2 });
2286    }
2287
2288    #[test]
2289    fn role_is_not_addressed_when_self_unknown_to_topology() {
2290        let r = compute_warm_standby_role(
2291            "npub1stranger",
2292            &req_with(
2293                Some(ReplicationMode::WarmStandby {
2294                    standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
2295                }),
2296                Some("npub1primary"),
2297            ),
2298        );
2299        assert_eq!(r, WarmStandbyRole::NotAddressed);
2300    }
2301
2302    // ----- standby watchdog: primary_is_silent decision -----
2303    //
2304    // The pure-function gate the watchdog uses to decide whether to
2305    // fire `schedule_standby_promotion` for a slot. Pinning the
2306    // edge cases here so a future refactor can't silently flip the
2307    // semantics that the warm-standby crash-detection promise rests
2308    // on.
2309
2310    #[test]
2311    fn fresh_primary_heartbeat_is_not_silent() {
2312        // Heartbeat 60s old, threshold 180s — comfortably alive.
2313        assert!(!primary_is_silent(1_000_000, 999_940, 180));
2314    }
2315
2316    #[test]
2317    fn primary_just_past_threshold_is_silent() {
2318        // 180s old vs 180s threshold — promotion fires.
2319        assert!(primary_is_silent(1_000_000, 999_820, 180));
2320        // 179s old — still alive (one second of grace).
2321        assert!(!primary_is_silent(1_000_000, 999_821, 180));
2322    }
2323
2324    #[test]
2325    fn unset_baseline_is_not_silent() {
2326        // baseline == 0 is the "unknown / caller forgot" sentinel.
2327        // The watchdog must always pass either a real last-heartbeat
2328        // timestamp or the slot's reservation timestamp; a 0 here
2329        // means the caller mis-wired the lookup, in which case
2330        // returning false (alive) is the safe failure mode — better
2331        // a missed promotion than a spurious one against a healthy
2332        // primary.
2333        assert!(!primary_is_silent(1_000_000, 0, 180));
2334        assert!(!primary_is_silent(50, 0, 180));
2335    }
2336
2337    #[test]
2338    fn fresh_slot_within_grace_window_is_not_silent() {
2339        // No heartbeat observed yet, baseline = slot.created_at.
2340        // 30s after reservation, still 150s of grace — primary is
2341        // alive (just hasn't published its first heartbeat to us
2342        // yet, or it landed on a relay we're not subscribed to).
2343        let created_at = 1_000_000;
2344        let now = created_at + 30;
2345        assert!(!primary_is_silent(now, created_at, 180));
2346    }
2347
2348    #[test]
2349    fn fresh_slot_past_grace_window_is_silent() {
2350        // No heartbeat observed yet AND we've waited the full
2351        // silence window since slot reservation. Either the primary
2352        // crashed before publishing any heartbeat we could see, or
2353        // it never came up. Either way, promote.
2354        let created_at = 1_000_000;
2355        let now = created_at + 180;
2356        assert!(primary_is_silent(now, created_at, 180));
2357    }
2358
2359    #[test]
2360    fn clock_skew_underflow_does_not_panic_or_misfire() {
2361        // baseline > now (clock went backwards or relay returned
2362        // a future-stamped event). saturating_sub yields 0; 0 < any
2363        // positive threshold; so primary is treated as alive. Better
2364        // false-negative (no promotion) than panic.
2365        assert!(!primary_is_silent(100, 200, 180));
2366    }
2367
2368    // ----- standby slot expiry: cleanup_loop selection -----
2369
2370    fn make_slot(workload_id: &str, expires_at: u64) -> StandbySlot {
2371        StandbySlot {
2372            workload_id: workload_id.to_string(),
2373            primary_npub: "npub1primary".to_string(),
2374            standby_index: 0,
2375            standby_count: 1,
2376            container_config: ContainerConfig {
2377                id: 1,
2378                name: "test".to_string(),
2379                image: "img".to_string(),
2380                cpu_cores: 1,
2381                memory_mb: 1024,
2382                storage_gb: 10,
2383                password: "p".to_string(),
2384                ssh_key: None,
2385                host_port: None,
2386                template_ports: vec![],
2387                template_env: HashMap::new(),
2388                extra_runtime_args: vec![],
2389                data_path: None,
2390                volume_encryption_key: None,
2391            },
2392            spec_id: "basic".to_string(),
2393            expires_at,
2394            owner_npub: "npub1owner".to_string(),
2395            created_at: 0,
2396            peer_standby_npubs: vec![],
2397        }
2398    }
2399
2400    fn select_expired(slots: &HashMap<String, StandbySlot>, now: u64) -> Vec<String> {
2401        slots
2402            .iter()
2403            .filter(|(_, slot)| slot.expires_at <= now)
2404            .map(|(workload_id, _)| workload_id.clone())
2405            .collect()
2406    }
2407
2408    #[test]
2409    fn select_expired_returns_only_past_expiry_slots() {
2410        let mut slots = HashMap::new();
2411        slots.insert("active".to_string(), make_slot("active", 2_000));
2412        slots.insert("expired".to_string(), make_slot("expired", 999));
2413        let mut expired = select_expired(&slots, 1_000);
2414        expired.sort();
2415        assert_eq!(expired, vec!["expired".to_string()]);
2416    }
2417
2418    #[test]
2419    fn select_expired_treats_expires_at_equals_now_as_expired() {
2420        // A slot whose lease ends *exactly now* should be reaped on
2421        // this tick, not held over for the next 30s tick. expires_at
2422        // is the FIRST instant the lease no longer applies.
2423        let mut slots = HashMap::new();
2424        slots.insert("boundary".to_string(), make_slot("boundary", 1_000));
2425        let expired = select_expired(&slots, 1_000);
2426        assert_eq!(expired, vec!["boundary".to_string()]);
2427    }
2428
2429    #[test]
2430    fn select_expired_returns_empty_when_no_slots_expired() {
2431        let mut slots = HashMap::new();
2432        slots.insert("a".to_string(), make_slot("a", 9_999));
2433        slots.insert("b".to_string(), make_slot("b", 9_999));
2434        assert!(select_expired(&slots, 1_000).is_empty());
2435    }
2436}