1use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15use crate::cashu::{
16 derive_seed_from_nostr_key, validate_and_redeem, CdkRedeemer, MintRedeemer, RedeemError,
17};
18use crate::compute::{ComputeBackend, ContainerConfig, PortMapping};
19use crate::docker::DockerBackend;
20use crate::durable_workload::{
21 DurableWorkload, HeartbeatObservation, QuorumConfig, StateMachineEvent, WorkloadState,
22 WorkloadStateMachine,
23};
24use crate::lxd::LxdBackend;
25use crate::nostr::{
26 parse_private_message_content, warm_standby_role, AccessDetailsContent, CapacityInfo,
27 EncryptedSpawnPodRequest, EncryptedTopUpPodRequest, ErrorResponseContent, HeartbeatContent,
28 LeaseRevocationContent, NostrRelaySubscriber, PodSpec, PrivateRequest, ProviderOfferContent,
29 RelayConfig, StandbyPromotionAnnouncementContent, StatusRequestContent, StatusResponseContent,
30 TopUpResponseContent, WarmStandbyRole,
31};
32use crate::proxmox::{ProxmoxBackend, ProxmoxClient};
33use crate::templates::{TemplateDefinition, TemplateName};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36pub enum BackendType {
37 Proxmox,
38 LXD,
39 Docker,
44 Kvm,
53}
54
55impl Default for BackendType {
56 fn default() -> Self {
57 Self::Proxmox
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ProviderConfig {
64 #[serde(default)]
65 pub backend_type: BackendType,
66
67 pub proxmox_url: String,
69 pub proxmox_token_id: String,
70 pub proxmox_token_secret: String,
71 pub proxmox_node: String,
72 pub proxmox_storage: String,
73 pub proxmox_template: String,
74 pub proxmox_bridge: String,
75 pub vmid_range_start: u32,
76 pub vmid_range_end: u32,
77
78 pub nostr_private_key: String,
80 pub nostr_relays: Vec<String>,
81
82 pub provider_name: String,
84 pub provider_location: Option<String>,
85 pub public_ip: String,
86 pub capabilities: Vec<String>,
87
88 pub specs: Vec<PodSpec>,
90 pub whitelisted_mints: Vec<String>,
91
92 pub heartbeat_interval_secs: u64,
94 pub minimum_duration_seconds: u64,
95
96 #[serde(default)]
98 pub tunnel_enabled: bool,
99 #[serde(default)]
100 pub tunnel_interface: Option<String>,
101 #[serde(default)]
102 pub ssh_port_start: Option<u16>,
103 #[serde(default)]
104 pub ssh_port_end: Option<u16>,
105
106 #[serde(default = "default_cashu_wallet_db_path")]
112 pub cashu_wallet_db_path: String,
113}
114
115fn default_cashu_wallet_db_path() -> String {
116 "./paygress-cashu-wallet.redb".to_string()
117}
118
119impl Default for ProviderConfig {
120 fn default() -> Self {
121 Self {
122 backend_type: BackendType::Proxmox,
123 proxmox_url: "https://localhost:8006/api2/json".to_string(),
124 proxmox_token_id: "root@pam!paygress".to_string(),
125 proxmox_token_secret: String::new(),
126 proxmox_node: "pve".to_string(),
127 proxmox_storage: "local-lvm".to_string(),
128 proxmox_template: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
129 proxmox_bridge: "vmbr0".to_string(),
130 vmid_range_start: 1000,
131 vmid_range_end: 1999,
132 nostr_private_key: String::new(),
133 nostr_relays: vec![
134 "wss://relay.damus.io".to_string(),
135 "wss://nos.lol".to_string(),
136 ],
137 provider_name: "Paygress Provider".to_string(),
138 provider_location: None,
139 public_ip: "127.0.0.1".to_string(),
140 capabilities: vec!["lxc".to_string()],
141 specs: vec![PodSpec {
142 id: "basic".to_string(),
143 name: "Basic".to_string(),
144 description: "1 vCPU, 1GB RAM".to_string(),
145 cpu_millicores: 1000,
146 memory_mb: 1024,
147 rate_msats_per_sec: 50,
148 }],
149 whitelisted_mints: vec!["https://mint.minibits.cash".to_string()],
150 heartbeat_interval_secs: 60,
151 minimum_duration_seconds: 60,
152 tunnel_enabled: false,
153 tunnel_interface: None,
154 ssh_port_start: None,
155 ssh_port_end: None,
156 cashu_wallet_db_path: default_cashu_wallet_db_path(),
157 }
158 }
159}
160
161#[derive(Debug, Clone, Serialize)]
163pub struct WorkloadInfo {
164 pub vmid: u32,
165 pub workload_type: String, pub spec_id: String,
167 pub created_at: u64,
168 pub expires_at: u64,
169 pub owner_npub: String,
170
171 #[serde(default)]
177 pub replication: crate::durable_workload::ReplicationMode,
178
179 #[serde(default)]
182 pub restart_policy: crate::durable_workload::RestartPolicy,
183
184 #[serde(default, skip_serializing_if = "Option::is_none")]
188 pub state_uri: Option<String>,
189
190 #[serde(default, skip_serializing_if = "Option::is_none")]
196 pub consumer_workload_id: Option<String>,
197}
198
199#[derive(Debug, Clone)]
214pub struct StandbySlot {
215 pub workload_id: String,
216 pub primary_npub: String,
217 pub standby_index: usize,
218 pub standby_count: usize,
219 pub container_config: ContainerConfig,
220 pub spec_id: String,
221 pub expires_at: u64,
222 pub owner_npub: String,
223 pub created_at: u64,
230 pub peer_standby_npubs: Vec<String>,
238}
239
240pub struct ProviderService {
242 config: ProviderConfig,
243 backend: Arc<dyn ComputeBackend>,
244 nostr: NostrRelaySubscriber,
245 redeemer: Arc<dyn MintRedeemer>,
246 active_workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
247 stats: Arc<Mutex<ProviderStats>>,
248
249 state_machine: Arc<Mutex<WorkloadStateMachine>>,
255
256 observation_buffer: Arc<Mutex<Vec<HeartbeatObservation>>>,
261
262 standby_slots: Arc<Mutex<HashMap<String, StandbySlot>>>,
269}
270
271#[derive(Debug, Clone, Default)]
272struct ProviderStats {
273 total_jobs_completed: u64,
274 uptime_start: u64,
275}
276
277impl ProviderService {
278 pub async fn new(config: ProviderConfig) -> Result<Self> {
280 let backend: Arc<dyn ComputeBackend> = match config.backend_type {
281 BackendType::Proxmox => {
282 let client = ProxmoxClient::new(
283 &config.proxmox_url,
284 &config.proxmox_token_id,
285 &config.proxmox_token_secret,
286 &config.proxmox_node,
287 )?;
288 Arc::new(ProxmoxBackend::new(
289 client,
290 &config.proxmox_storage,
291 &config.proxmox_bridge,
292 &config.proxmox_template,
293 ))
294 }
295 BackendType::LXD => Arc::new(LxdBackend::new(
296 &config.proxmox_storage, &config.proxmox_bridge, )),
299 BackendType::Docker => Arc::new(DockerBackend::new()),
300 BackendType::Kvm => {
301 if let Err(e) = crate::kvm::KvmBackend::check_kvm_available().await {
306 tracing::error!("KVM backend selected but unavailable: {}", e);
307 anyhow::bail!("KVM backend unavailable: {}", e);
308 }
309 Arc::new(crate::kvm::KvmBackend::new(crate::kvm::KvmConfig::default()))
310 }
311 };
312
313 let relay_config = RelayConfig {
315 relays: config.nostr_relays.clone(),
316 private_key: Some(config.nostr_private_key.clone()),
317 };
318 let nostr = NostrRelaySubscriber::new(relay_config).await?;
319
320 let wallet_db = cdk_redb::wallet::WalletRedbDatabase::new(std::path::Path::new(
326 &config.cashu_wallet_db_path,
327 ))
328 .map_err(|e| {
329 anyhow::anyhow!(
330 "failed to open cashu wallet database at {}: {}",
331 config.cashu_wallet_db_path,
332 e
333 )
334 })?;
335 let seed = derive_seed_from_nostr_key(&config.nostr_private_key);
336 let redeemer: Arc<dyn MintRedeemer> = Arc::new(CdkRedeemer::new(Arc::new(wallet_db), seed));
337
338 let now = std::time::SystemTime::now()
339 .duration_since(std::time::UNIX_EPOCH)?
340 .as_secs();
341
342 Ok(Self {
343 config,
344 backend,
345 nostr,
346 redeemer,
347 active_workloads: Arc::new(Mutex::new(HashMap::new())),
348 stats: Arc::new(Mutex::new(ProviderStats {
349 total_jobs_completed: 0,
350 uptime_start: now,
351 })),
352 state_machine: Arc::new(Mutex::new(WorkloadStateMachine::new(
353 QuorumConfig::default(),
354 ))),
355 observation_buffer: Arc::new(Mutex::new(Vec::new())),
356 standby_slots: Arc::new(Mutex::new(HashMap::new())),
357 })
358 }
359
360 pub fn get_npub(&self) -> String {
362 self.nostr.get_service_public_key()
363 }
364
365 pub async fn run(&self) -> Result<()> {
367 info!("🚀 Starting Paygress Provider Service");
368 info!("Provider: {}", self.config.provider_name);
369 info!("NPUB: {}", self.get_npub());
370
371 self.publish_offer().await?;
373
374 tokio::select! {
378 result = self.heartbeat_loop() => {
379 error!("Heartbeat loop exited: {:?}", result);
380 result
381 }
382 result = self.listen_for_requests() => {
383 error!("Request listener exited: {:?}", result);
384 result
385 }
386 result = self.cleanup_loop() => {
387 error!("Cleanup loop exited: {:?}", result);
388 result
389 }
390 result = self.orchestrator_loop() => {
391 error!("Orchestrator loop exited: {:?}", result);
392 result
393 }
394 result = self.standby_watchdog_loop() => {
395 error!("Standby watchdog loop exited: {:?}", result);
396 result
397 }
398 }
399 }
400
401 async fn publish_offer(&self) -> Result<()> {
403 let stats = self.stats.lock().await;
404
405 let offer = ProviderOfferContent {
406 provider_npub: self.get_npub(),
407 hostname: self.config.provider_name.clone(),
408 location: self.config.provider_location.clone(),
409 capabilities: self.config.capabilities.clone(),
410 specs: self.config.specs.clone(),
411 whitelisted_mints: self.config.whitelisted_mints.clone(),
412 uptime_percent: 100.0, total_jobs_completed: stats.total_jobs_completed,
414 api_endpoint: None, version: crate::nostr::SCHEMA_VERSION,
416 isolation_level: match self.config.backend_type {
422 BackendType::Kvm => crate::nostr::IsolationLevel::DedicatedHost,
423 BackendType::Proxmox | BackendType::LXD | BackendType::Docker => {
424 crate::nostr::IsolationLevel::SharedKernel
425 }
426 },
427 stake_proof: None,
428 };
429
430 self.nostr.publish_provider_offer(offer).await?;
431 Ok(())
432 }
433
434 async fn heartbeat_loop(&self) -> Result<()> {
436 let interval = tokio::time::Duration::from_secs(self.config.heartbeat_interval_secs);
437
438 loop {
439 if let Err(e) = self.send_heartbeat().await {
440 warn!("Failed to send heartbeat: {}", e);
441 }
442 tokio::time::sleep(interval).await;
443 }
444 }
445
446 async fn send_heartbeat(&self) -> Result<()> {
448 let workloads = self.active_workloads.lock().await;
449
450 let capacity = match self.backend.get_node_status().await {
452 Ok(status) => CapacityInfo {
453 cpu_available: ((1.0 - status.cpu_usage) * 100000.0) as u64, memory_mb_available: status.memory_total.saturating_sub(status.memory_used)
455 / (1024 * 1024),
456 storage_gb_available: status.disk_total.saturating_sub(status.disk_used)
457 / (1024 * 1024 * 1024),
458 },
459 Err(e) => {
460 warn!("Failed to get node status: {}", e);
461 CapacityInfo {
462 cpu_available: 0,
463 memory_mb_available: 0,
464 storage_gb_available: 0,
465 }
466 }
467 };
468
469 let now = std::time::SystemTime::now()
470 .duration_since(std::time::UNIX_EPOCH)?
471 .as_secs();
472
473 let heartbeat = HeartbeatContent {
474 provider_npub: self.get_npub(),
475 timestamp: now,
476 active_workloads: workloads.len() as u32,
477 available_capacity: capacity,
478 version: crate::nostr::SCHEMA_VERSION,
479 };
480
481 let (_event_id, accepting_relays) = self.nostr.publish_heartbeat(heartbeat).await?;
482
483 if !accepting_relays.is_empty() {
490 let provider_npub = self.get_npub();
491 let mut buf = self.observation_buffer.lock().await;
492 for relay_url in accepting_relays {
493 buf.push(HeartbeatObservation {
494 provider_npub: provider_npub.clone(),
495 relay_url,
496 seen_at: now,
497 event_timestamp: now,
498 });
499 }
500 }
501
502 Ok(())
503 }
504
505 async fn listen_for_requests(&self) -> Result<()> {
507 info!("Listening for Paygress requests...");
508
509 let backend = self.backend.clone();
511 let config = self.config.clone();
512 let nostr = self.nostr.clone();
513 let redeemer = self.redeemer.clone();
514 let workloads = self.active_workloads.clone();
515 let stats = self.stats.clone();
516 let state_machine = self.state_machine.clone();
517 let standby_slots = self.standby_slots.clone();
518
519 self.nostr
520 .subscribe_to_pod_events(move |event| {
521 let backend = backend.clone();
522 let config = config.clone();
523 let nostr = nostr.clone();
524 let redeemer = redeemer.clone();
525 let workloads = workloads.clone();
526 let stats = stats.clone();
527 let state_machine = state_machine.clone();
528 let standby_slots = standby_slots.clone();
529
530 Box::pin(async move {
531 let my_pubkey = nostr.public_key().to_hex();
532 if event.pubkey == my_pubkey {
533 return Ok(());
534 }
535
536 debug!(
537 "Handler received event kind: {}, from: {}, message_type: {}",
538 event.kind, event.pubkey, event.message_type
539 );
540
541 if let Some(revocation) = crate::nostr::parse_revocation_event(&event) {
545 info!(
546 "Lease revocation observed: workload_id={}, primary={}, reason={}, state_uri={:?}, standbys={:?}",
547 revocation.workload_id,
548 revocation.primary_provider_npub,
549 revocation.reason,
550 revocation.state_uri,
551 revocation.standby_providers,
552 );
553 let workload_id = revocation.workload_id.clone();
559 let primary_npub = revocation.primary_provider_npub.clone();
560 let slot_opt = standby_slots.lock().await.get(&workload_id).cloned();
561 if let Some(slot) = slot_opt {
562 if slot.primary_npub != primary_npub {
563 warn!(
564 "Revocation primary_npub ({}) does not match slot's primary ({}); ignoring",
565 primary_npub, slot.primary_npub
566 );
567 return Ok(());
568 }
569 schedule_standby_promotion(
570 backend.clone(),
571 workloads.clone(),
572 state_machine.clone(),
573 standby_slots.clone(),
574 nostr.clone(),
575 slot,
576 );
577 } else {
578 debug!(
579 "Revocation workload_id={} did not match any local standby slot; ignoring",
580 workload_id
581 );
582 }
583 return Ok(());
584 }
585
586 let request_type = match parse_private_message_content(&event.content) {
588 Ok(req) => req,
589 Err(e) => {
590 warn!("Failed to parse request from {}: {}", event.pubkey, e);
591 let error = ErrorResponseContent {
592 error_type: "invalid_request".to_string(),
593 message: "Failed to parse request".to_string(),
594 details: Some(e.to_string()),
595 };
596 let _ = nostr
597 .send_error_response_private_message(
598 &event.pubkey,
599 error,
600 &event.message_type,
601 )
602 .await;
603 return Ok(());
604 }
605 };
606
607 debug!("Successfully parsed request metadata");
608
609 match request_type {
611 PrivateRequest::Spawn(spawn_req) => {
612 if let Err(e) = handle_spawn_request(
613 backend.as_ref(),
614 &config,
615 &nostr,
616 redeemer.as_ref(),
617 &workloads,
618 &stats,
619 &state_machine,
620 &standby_slots,
621 &event.pubkey,
622 &event.message_type,
623 spawn_req,
624 )
625 .await
626 {
627 error!("Failed to handle spawn request: {}", e);
628 }
629 }
630 PrivateRequest::Status(status_req) => {
631 if let Err(e) = handle_status_request(
632 &config,
633 &nostr,
634 &workloads,
635 &event.pubkey,
636 &event.message_type,
637 status_req,
638 )
639 .await
640 {
641 error!("Failed to handle status request: {}", e);
642 }
643 }
644 PrivateRequest::TopUp(topup_req) => {
645 if let Err(e) = handle_topup_request(
646 &config,
647 &nostr,
648 redeemer.as_ref(),
649 &workloads,
650 &event.pubkey,
651 &event.message_type,
652 topup_req,
653 )
654 .await
655 {
656 error!("Failed to handle topup request: {}", e);
657 }
658 }
659 }
660
661 Ok(())
662 })
663 })
664 .await?;
665
666 Ok(())
667 }
668
669 async fn orchestrator_loop(&self) -> Result<()> {
679 let interval = tokio::time::Duration::from_secs(15);
680 info!("Orchestrator loop starting (cadence: 15s)");
681
682 loop {
683 tokio::time::sleep(interval).await;
684
685 let now = std::time::SystemTime::now()
686 .duration_since(std::time::UNIX_EPOCH)?
687 .as_secs();
688
689 let observations: Vec<HeartbeatObservation> = {
695 let mut buf = self.observation_buffer.lock().await;
696 std::mem::take(&mut *buf)
697 };
698
699 let events = {
702 let mut sm = self.state_machine.lock().await;
703 sm.tick(now, &observations)
704 };
705
706 if events.is_empty() {
707 continue;
708 }
709
710 for event in events {
711 self.handle_state_machine_event(event, now).await;
712 }
713 }
714 }
715
716 async fn handle_state_machine_event(&self, event: StateMachineEvent, now: u64) {
720 match event {
721 StateMachineEvent::EnteredLive { workload_id } => {
722 info!("Workload {} entered Live", workload_id);
723 }
724 StateMachineEvent::EnteredSuspect { workload_id } => {
725 warn!(
726 "Workload {} entered Suspect (heartbeat quorum lost)",
727 workload_id
728 );
729 }
730 StateMachineEvent::Evicted {
731 workload_id,
732 reason,
733 } => {
734 error!("Workload {} evicted: {}", workload_id, reason);
735 }
736 StateMachineEvent::PublishLeaseRevocation {
737 workload_id,
738 standby_providers,
739 } => {
740 let (consumer_workload_id, state_uri) = {
748 let lock = self.active_workloads.lock().await;
749 let entry = lock.get(&workload_id);
750 let cid = entry
751 .and_then(|w| w.consumer_workload_id.clone())
752 .unwrap_or_else(|| format!("vmid-{}", workload_id));
753 let suri = entry.and_then(|w| w.state_uri.clone());
754 (cid, suri)
755 };
756 let revocation = LeaseRevocationContent {
757 workload_id: consumer_workload_id.clone(),
758 primary_provider_npub: self.get_npub(),
759 standby_providers: standby_providers.clone(),
760 reason: "heartbeat-quorum-lost-past-t2".to_string(),
761 revoked_at: now,
762 state_uri,
763 version: crate::nostr::SCHEMA_VERSION,
764 };
765 match self.nostr.publish_lease_revocation(revocation).await {
766 Ok(event_id) => info!(
767 "Published lease revocation for workload {} (vmid {}) to {} standby(s): {}",
768 consumer_workload_id,
769 workload_id,
770 standby_providers.len(),
771 event_id
772 ),
773 Err(e) => error!(
774 "Failed to publish lease revocation for workload {}: {}",
775 workload_id, e
776 ),
777 }
778 }
779 StateMachineEvent::AttemptRespawn {
780 workload_id,
781 attempt,
782 } => {
783 info!(
784 "Attempting respawn of workload {} (attempt {})",
785 workload_id, attempt
786 );
787 let mut sm = self.state_machine.lock().await;
796 sm.notify_respawn_failed(
797 workload_id,
798 "respawn handler not yet implemented (follow-up)",
799 );
800 }
801 StateMachineEvent::Failed {
802 workload_id,
803 reason,
804 } => {
805 error!("Workload {} marked Failed: {}", workload_id, reason);
806 let mut wl = self.active_workloads.lock().await;
810 wl.remove(&workload_id);
811 }
812 }
813 }
814
815 async fn cleanup_loop(&self) -> Result<()> {
817 let interval = tokio::time::Duration::from_secs(30);
818
819 loop {
820 tokio::time::sleep(interval).await;
821
822 let now = std::time::SystemTime::now()
823 .duration_since(std::time::UNIX_EPOCH)?
824 .as_secs();
825
826 let mut workloads = self.active_workloads.lock().await;
827 let expired: Vec<u32> = workloads
828 .iter()
829 .filter(|(_, w)| w.expires_at <= now)
830 .map(|(vmid, _)| *vmid)
831 .collect();
832
833 for vmid in expired {
834 info!("Cleaning up expired workload: {}", vmid);
835
836 if let Some(_workload) = workloads.remove(&vmid) {
837 let stop_result = self.backend.stop_container(vmid).await;
838 let result = match stop_result {
839 Ok(_) => self.backend.delete_container(vmid).await,
840 Err(e) => Err(e),
841 };
842
843 self.state_machine.lock().await.untrack(vmid);
849
850 match result {
851 Ok(_) => {
852 info!("Cleaned up workload {}", vmid);
853 let mut stats = self.stats.lock().await;
854 stats.total_jobs_completed += 1;
855 }
856 Err(e) => error!("Failed to cleanup workload {}: {}", vmid, e),
857 }
858 }
859 }
860 drop(workloads);
861
862 let mut slots = self.standby_slots.lock().await;
871 let expired_slots: Vec<String> = slots
872 .iter()
873 .filter(|(_, slot)| slot.expires_at <= now)
874 .map(|(workload_id, _)| workload_id.clone())
875 .collect();
876 for workload_id in expired_slots {
877 if let Some(slot) = slots.remove(&workload_id) {
878 info!(
879 "Expiring standby slot for workload {} (index {}/{}, primary {}, expired at {})",
880 workload_id, slot.standby_index, slot.standby_count, slot.primary_npub, slot.expires_at
881 );
882 }
883 }
884 }
885 }
886
887 async fn standby_watchdog_loop(&self) -> Result<()> {
929 let interval = tokio::time::Duration::from_secs(STANDBY_WATCHDOG_INTERVAL_SECS);
930 info!(
931 "Standby watchdog loop starting (cadence: {}s, silence threshold: {}s)",
932 STANDBY_WATCHDOG_INTERVAL_SECS, STANDBY_HEARTBEAT_SILENCE_SECS
933 );
934
935 loop {
936 tokio::time::sleep(interval).await;
937
938 let slots: Vec<StandbySlot> = {
939 let lock = self.standby_slots.lock().await;
940 lock.values().cloned().collect()
941 };
942 if slots.is_empty() {
943 continue;
944 }
945
946 let primary_npubs: Vec<String> = slots
950 .iter()
951 .map(|s| s.primary_npub.clone())
952 .collect::<std::collections::HashSet<_>>()
953 .into_iter()
954 .collect();
955
956 let heartbeats = match self.nostr.get_latest_heartbeats_multi(primary_npubs).await {
957 Ok(hb) => hb,
958 Err(e) => {
959 warn!(
960 "standby watchdog: heartbeat batch query failed: {}; \
961 skipping this tick (will retry next interval)",
962 e
963 );
964 continue;
965 }
966 };
967
968 let now = std::time::SystemTime::now()
969 .duration_since(std::time::UNIX_EPOCH)?
970 .as_secs();
971
972 for slot in slots {
973 let last_seen = heartbeats
974 .get(&slot.primary_npub)
975 .map(|hb| hb.timestamp)
976 .unwrap_or(0);
977 let silence_baseline = if last_seen == 0 {
985 slot.created_at
986 } else {
987 last_seen
988 };
989 if !primary_is_silent(now, silence_baseline, STANDBY_HEARTBEAT_SILENCE_SECS) {
990 continue;
991 }
992 let silence_secs = now.saturating_sub(silence_baseline);
993 warn!(
994 "Primary {} silent for {}s on slot workload_id={} (threshold {}s); \
995 triggering standby promotion (assumes hard crash; deduped \
996 intra-process via slot.remove and inter-process via the \
997 promotion-announcement event published post-spawn)",
998 slot.primary_npub,
999 silence_secs,
1000 slot.workload_id,
1001 STANDBY_HEARTBEAT_SILENCE_SECS
1002 );
1003 schedule_standby_promotion(
1004 self.backend.clone(),
1005 self.active_workloads.clone(),
1006 self.state_machine.clone(),
1007 self.standby_slots.clone(),
1008 self.nostr.clone(),
1009 slot,
1010 );
1011 }
1012 }
1013 }
1014}
1015
1016const STANDBY_WATCHDOG_INTERVAL_SECS: u64 = 30;
1019
1020const STANDBY_HEARTBEAT_SILENCE_SECS: u64 = 180;
1028
1029fn primary_is_silent(now: u64, baseline: u64, threshold: u64) -> bool {
1045 if baseline == 0 {
1046 return false;
1047 }
1048 now.saturating_sub(baseline) >= threshold
1049}
1050
1051async fn handle_spawn_request(
1061 backend: &dyn ComputeBackend,
1062 config: &ProviderConfig,
1063 nostr: &NostrRelaySubscriber,
1064 redeemer: &dyn MintRedeemer,
1065 workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
1066 stats: &Arc<Mutex<ProviderStats>>,
1067 state_machine: &Arc<Mutex<WorkloadStateMachine>>,
1068 standby_slots: &Arc<Mutex<HashMap<String, StandbySlot>>>,
1069 requester_pubkey: &str,
1070 message_type: &str,
1071 request: EncryptedSpawnPodRequest,
1072) -> Result<()> {
1073 info!(
1074 "Processing spawn request from {} (tier: {:?})",
1075 requester_pubkey, request.pod_spec_id
1076 );
1077
1078 let role = compute_warm_standby_role(&nostr.get_service_public_key(), &request);
1085 if matches!(role, WarmStandbyRole::NotAddressed) {
1086 if request
1087 .replication
1088 .as_ref()
1089 .map(|r| {
1090 matches!(
1091 r,
1092 crate::durable_workload::ReplicationMode::WarmStandby { .. }
1093 )
1094 })
1095 .unwrap_or(false)
1096 {
1097 let err_msg =
1102 "warm-standby spawn arrived at a provider not designated as primary or standby";
1103 warn!("{}", err_msg);
1104 nostr
1105 .send_error_response(
1106 requester_pubkey,
1107 "not_addressed",
1108 err_msg,
1109 None,
1110 message_type,
1111 )
1112 .await?;
1113 return Ok(());
1114 }
1115 }
1116
1117 let payment_msats = match validate_and_redeem(
1119 redeemer,
1120 &config.whitelisted_mints,
1121 &request.cashu_token,
1122 )
1123 .await
1124 {
1125 Ok(v) => v,
1126 Err(e) => {
1127 let (error_type, err_msg) = redeem_error_to_response(&e);
1128 error!("Cashu redemption failed: {}", err_msg);
1129 nostr
1130 .send_error_response(requester_pubkey, error_type, &err_msg, None, message_type)
1131 .await?;
1132 return Ok(());
1133 }
1134 };
1135
1136 let spec = match config
1138 .specs
1139 .iter()
1140 .find(|s| Some(s.id.clone()) == request.pod_spec_id)
1141 {
1142 Some(s) => s,
1143 None => {
1144 if let Some(s) = config.specs.first() {
1146 s
1147 } else {
1148 let err_msg = "No pod specifications available on this provider";
1149 error!("{}", err_msg);
1150 nostr
1151 .send_error_response(requester_pubkey, "no_specs", err_msg, None, message_type)
1152 .await?;
1153 return Ok(());
1154 }
1155 }
1156 };
1157
1158 let duration_secs = payment_msats / spec.rate_msats_per_sec;
1160 if duration_secs < config.minimum_duration_seconds {
1161 let err_msg = format!(
1162 "Insufficient payment for minimum duration. Required: {} msats for {}s",
1163 config.minimum_duration_seconds * spec.rate_msats_per_sec,
1164 config.minimum_duration_seconds
1165 );
1166 warn!("{}", err_msg);
1167 nostr
1168 .send_error_response(
1169 requester_pubkey,
1170 "insufficient_payment",
1171 &err_msg,
1172 None,
1173 message_type,
1174 )
1175 .await?;
1176 return Ok(());
1177 }
1178
1179 info!(
1180 "Validated payment: {} msats for {}s on tier {}",
1181 payment_msats, duration_secs, spec.name
1182 );
1183
1184 let id = match backend
1186 .find_available_id(config.vmid_range_start, config.vmid_range_end)
1187 .await
1188 {
1189 Ok(id) => id,
1190 Err(e) => {
1191 let err_msg = format!("Failed to find available ID: {}", e);
1192 error!("{}", err_msg);
1193 nostr
1194 .send_error_response(
1195 requester_pubkey,
1196 "provisioning_error",
1197 &err_msg,
1198 None,
1199 message_type,
1200 )
1201 .await?;
1202 return Ok(());
1203 }
1204 };
1205
1206 let password = generate_password();
1208
1209 let host_port = match config.ssh_port_start {
1211 Some(start) => start + (id - config.vmid_range_start) as u16,
1212 None => 30000 + (id % 10000) as u16,
1213 };
1214
1215 let template = if let Some(slug) = request.template_slug.as_deref() {
1220 match TemplateName::from_slug(slug) {
1221 Some(name) => Some(TemplateDefinition::lookup(name)),
1222 None => {
1223 let err_msg = format!(
1224 "Unknown template `{}` — provider does not advertise it",
1225 slug
1226 );
1227 warn!("{}", err_msg);
1228 nostr
1229 .send_error_response(
1230 requester_pubkey,
1231 "unknown_template",
1232 &err_msg,
1233 None,
1234 message_type,
1235 )
1236 .await?;
1237 return Ok(());
1238 }
1239 }
1240 } else {
1241 None
1242 };
1243
1244 let image = template
1246 .as_ref()
1247 .map(|t| t.image.to_string())
1248 .unwrap_or_else(|| request.pod_image.clone());
1249
1250 let template_ports: Vec<PortMapping> = template
1256 .as_ref()
1257 .map(|t| {
1258 t.ports
1259 .iter()
1260 .enumerate()
1261 .map(|(i, p)| PortMapping {
1262 host_port: host_port.saturating_add(1 + i as u16),
1263 container_port: p.container_port,
1264 protocol: "tcp",
1265 })
1266 .collect()
1267 })
1268 .unwrap_or_default();
1269
1270 let mut template_env: HashMap<String, String> = template
1271 .as_ref()
1272 .map(|t| {
1273 t.env
1274 .iter()
1275 .map(|(k, v)| (k.to_string(), v.to_string()))
1276 .collect()
1277 })
1278 .unwrap_or_default();
1279
1280 if let Some(t) = template.as_ref() {
1292 if t.env.contains_key("EXEC_USER") {
1293 template_env.insert("EXEC_USER".to_string(), "root".to_string());
1294 }
1295 if t.env.contains_key("EXEC_PASS") {
1296 template_env.insert("EXEC_PASS".to_string(), password.clone());
1297 }
1298 }
1299
1300 let extra_runtime_args: Vec<String> = template
1301 .as_ref()
1302 .map(|t| t.extra_docker_args.iter().map(|s| s.to_string()).collect())
1303 .unwrap_or_default();
1304
1305 let data_path: Option<String> = template
1306 .as_ref()
1307 .and_then(|t| t.data_path.map(|p| p.to_string()));
1308
1309 let volume_encryption_key = match (&data_path, request.volume_encryption.as_ref()) {
1315 (Some(_), Some(ve)) => match ve.decoded_key() {
1316 Ok(key) => {
1317 info!(
1318 "Spawn request includes volume_encryption (algorithm={}, version={}); will create LUKS-encrypted data volume",
1319 ve.algorithm, ve.version
1320 );
1321 Some(key)
1322 }
1323 Err(e) => {
1324 error!(
1325 "Rejecting spawn: malformed volume_encryption.key_b64: {}",
1326 e
1327 );
1328 let err_payload = ErrorResponseContent {
1329 error_type: "invalid_volume_encryption".to_string(),
1330 message: format!("volume_encryption rejected: {}", e),
1331 details: None,
1332 };
1333 let _ = nostr
1334 .send_error_response_private_message(
1335 requester_pubkey,
1336 err_payload,
1337 message_type,
1338 )
1339 .await;
1340 return Ok(());
1341 }
1342 },
1343 (None, Some(_)) => {
1344 warn!(
1345 "Spawn request set volume_encryption but template has no data_path; encryption is a no-op for stateless workloads"
1346 );
1347 None
1348 }
1349 _ => None,
1350 };
1351
1352 let container_config = ContainerConfig {
1354 id,
1355 name: format!("paygress-{}", id),
1356 image,
1357 cpu_cores: (spec.cpu_millicores / 1000).max(1) as u32,
1358 memory_mb: spec.memory_mb as u32,
1359 storage_gb: 10, password: password.clone(),
1361 ssh_key: None,
1362 host_port: Some(host_port),
1363 template_ports,
1364 template_env,
1365 extra_runtime_args,
1366 data_path,
1367 volume_encryption_key,
1368 };
1369
1370 if let WarmStandbyRole::Standby { index, count } = role {
1383 let workload_id = match request.workload_id.as_deref() {
1384 Some(id) if !id.is_empty() => id.to_string(),
1385 _ => {
1386 let err_msg = "warm-standby spawn missing workload_id (consumer-assigned UUID required to coordinate primary + standbys)";
1387 warn!("{}", err_msg);
1388 nostr
1389 .send_error_response(
1390 requester_pubkey,
1391 "missing_workload_id",
1392 err_msg,
1393 None,
1394 message_type,
1395 )
1396 .await?;
1397 return Ok(());
1398 }
1399 };
1400 let primary_npub = request.primary_npub.clone().unwrap_or_default();
1401 let now = std::time::SystemTime::now()
1402 .duration_since(std::time::UNIX_EPOCH)?
1403 .as_secs();
1404 let self_npub = nostr.get_service_public_key();
1411 let peer_standby_npubs: Vec<String> = match request.replication.as_ref() {
1412 Some(crate::durable_workload::ReplicationMode::WarmStandby { standby_providers }) => {
1413 standby_providers
1414 .iter()
1415 .filter(|p| !crate::nostr::npubs_equal(p, &self_npub))
1416 .cloned()
1417 .collect()
1418 }
1419 _ => Vec::new(),
1420 };
1421 let slot = StandbySlot {
1422 workload_id: workload_id.clone(),
1423 primary_npub,
1424 standby_index: index,
1425 standby_count: count,
1426 container_config: container_config.clone(),
1427 spec_id: spec.id.clone(),
1428 expires_at: now + duration_secs,
1429 owner_npub: requester_pubkey.to_string(),
1430 created_at: now,
1431 peer_standby_npubs,
1432 };
1433 info!(
1434 "Reserved standby slot for workload_id={} (index {}/{}, expires at {})",
1435 workload_id, index, count, slot.expires_at
1436 );
1437 standby_slots.lock().await.insert(workload_id.clone(), slot);
1438
1439 let expires_dt =
1446 chrono::DateTime::from_timestamp((now + duration_secs) as i64, 0).unwrap_or_default();
1447 let details = AccessDetailsContent {
1448 pod_npub: format!("standby-slot-{}", workload_id),
1449 node_port: 0, expires_at: expires_dt.to_rfc3339(),
1451 cpu_millicores: spec.cpu_millicores,
1452 memory_mb: spec.memory_mb,
1453 pod_spec_name: spec.name.clone(),
1454 pod_spec_description: spec.description.clone(),
1455 instructions: vec![
1456 format!(
1457 "🛏️ Standby slot reserved (index {}/{} for workload {}).",
1458 index, count, workload_id
1459 ),
1460 format!(
1461 "Will promote on LeaseRevocation event from primary {}.",
1462 request.primary_npub.as_deref().unwrap_or("(unset)")
1463 ),
1464 format!(
1465 "Expected promotion delay: {} seconds (index * 30s backoff).",
1466 index * 30
1467 ),
1468 ],
1469 host_address: config.public_ip.clone(),
1470 template_ports: Vec::new(),
1471 };
1472 nostr
1473 .send_access_details_private_message(requester_pubkey, details, message_type)
1474 .await?;
1475 return Ok(());
1476 }
1477
1478 debug!("Calling backend.create_container for workload {}", id);
1479 if let Err(e) = backend.create_container(&container_config).await {
1480 let err_msg = format!("Backend failed to create workload: {}", e);
1481 error!("{}", err_msg);
1482 nostr
1483 .send_error_response(
1484 requester_pubkey,
1485 "backend_error",
1486 &err_msg,
1487 None,
1488 message_type,
1489 )
1490 .await?;
1491 return Ok(());
1492 }
1493 debug!("Successfully created container {}", id);
1494
1495 let now = std::time::SystemTime::now()
1496 .duration_since(std::time::UNIX_EPOCH)?
1497 .as_secs();
1498
1499 let replication = request
1511 .replication
1512 .clone()
1513 .unwrap_or_else(crate::durable_workload::ReplicationMode::default);
1514 let workload = WorkloadInfo {
1515 vmid: id,
1516 workload_type: "lxc".to_string(), spec_id: spec.id.clone(),
1518 created_at: now,
1519 expires_at: now + duration_secs,
1520 owner_npub: requester_pubkey.to_string(),
1521 replication,
1522 restart_policy: crate::durable_workload::RestartPolicy::default(),
1523 state_uri: None,
1524 consumer_workload_id: request.workload_id.clone().filter(|s| !s.is_empty()),
1528 };
1529
1530 workloads.lock().await.insert(id, workload.clone());
1531
1532 state_machine.lock().await.track(DurableWorkload {
1536 workload_id: id,
1537 provider_npub: nostr.get_service_public_key(),
1538 state: WorkloadState::Provisioning { since: now },
1539 replication: workload.replication.clone(),
1540 restart_policy: workload.restart_policy,
1541 state_uri: workload.state_uri.clone(),
1542 created_at: now,
1543 expires_at: workload.expires_at,
1544 });
1545
1546 {
1548 let mut s = stats.lock().await;
1549 s.total_jobs_completed += 1;
1550 }
1551
1552 let host = &config.public_ip;
1555
1556 let template_access_ports: Vec<crate::nostr::TemplateAccessPort> = container_config
1562 .template_ports
1563 .iter()
1564 .map(|p| {
1565 let label = template
1566 .as_ref()
1567 .and_then(|t| {
1568 t.ports
1569 .iter()
1570 .find(|tp| tp.container_port == p.container_port)
1571 })
1572 .map(|tp| tp.label.to_string())
1573 .unwrap_or_else(|| format!("port-{}", p.container_port));
1574 crate::nostr::TemplateAccessPort {
1575 host_port: p.host_port,
1576 container_port: p.container_port,
1577 protocol: p.protocol.to_string(),
1578 label,
1579 }
1580 })
1581 .collect();
1582
1583 let expires_dt =
1585 chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
1586
1587 let mut instructions = vec![
1590 format!("🚀 Workload provisioned successfully!"),
1591 format!("👤 Username: root"),
1592 format!("🔑 Password: {}", password),
1593 format!("⌛ Expires: {}", expires_dt.format("%Y-%m-%d %H:%M:%S UTC")),
1594 format!("Access: You can connect to the container using SSH."),
1595 format!(" ssh -p {} root@{}", host_port, host),
1596 ];
1597 if !template_access_ports.is_empty() {
1598 instructions.push(format!("Workload ports:"));
1599 for p in &template_access_ports {
1600 instructions.push(format!(
1601 " {} ({}): {}://{}:{}",
1602 p.label, p.protocol, p.protocol, host, p.host_port
1603 ));
1604 }
1605 }
1606
1607 let details = AccessDetailsContent {
1608 pod_npub: format!("container-{}", id),
1609 node_port: host_port,
1610 expires_at: expires_dt.to_rfc3339(),
1611 cpu_millicores: spec.cpu_millicores,
1612 memory_mb: spec.memory_mb,
1613 pod_spec_name: spec.name.clone(),
1614 pod_spec_description: spec.description.clone(),
1615 instructions,
1616 host_address: host.clone(),
1617 template_ports: template_access_ports,
1618 };
1619
1620 debug!("Sending access details to {}", requester_pubkey);
1621 nostr
1622 .send_access_details_private_message(requester_pubkey, details, message_type)
1623 .await?;
1624
1625 debug!("Access details sent successfully");
1626
1627 info!("Workload {} provisioned for {} seconds", id, duration_secs);
1628 Ok(())
1629}
1630
1631async fn handle_topup_request(
1643 config: &ProviderConfig,
1644 nostr: &NostrRelaySubscriber,
1645 redeemer: &dyn MintRedeemer,
1646 workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
1647 requester_pubkey: &str,
1648 message_type: &str,
1649 request: EncryptedTopUpPodRequest,
1650) -> Result<()> {
1651 info!(
1652 "Processing topup request from {} for {}",
1653 requester_pubkey, request.pod_npub
1654 );
1655
1656 let vmid = match parse_pod_npub(&request.pod_npub) {
1657 Some(v) => v,
1658 None => {
1659 let err_msg = format!(
1660 "Could not parse pod identifier `{}`; expected `container-<id>` or numeric id",
1661 request.pod_npub
1662 );
1663 warn!("{}", err_msg);
1664 nostr
1665 .send_error_response(
1666 requester_pubkey,
1667 "invalid_pod_id",
1668 &err_msg,
1669 None,
1670 message_type,
1671 )
1672 .await?;
1673 return Ok(());
1674 }
1675 };
1676
1677 let now = std::time::SystemTime::now()
1680 .duration_since(std::time::UNIX_EPOCH)?
1681 .as_secs();
1682
1683 let (spec_id, current_expires_at) = {
1684 let lock = workloads.lock().await;
1685 match lock.get(&vmid) {
1686 Some(w) if w.owner_npub == requester_pubkey => (w.spec_id.clone(), w.expires_at),
1687 Some(_) => {
1688 drop(lock);
1689 let err_msg = "Pod not owned by requester";
1690 warn!("{}: vmid={}", err_msg, vmid);
1691 nostr
1692 .send_error_response(requester_pubkey, "not_owner", err_msg, None, message_type)
1693 .await?;
1694 return Ok(());
1695 }
1696 None => {
1697 drop(lock);
1698 let err_msg = format!("Pod {} not found", request.pod_npub);
1699 warn!("{}", err_msg);
1700 nostr
1701 .send_error_response(
1702 requester_pubkey,
1703 "not_found",
1704 &err_msg,
1705 None,
1706 message_type,
1707 )
1708 .await?;
1709 return Ok(());
1710 }
1711 }
1712 };
1713
1714 if current_expires_at <= now {
1715 let err_msg = format!(
1716 "Pod {} lease has already expired; spawn a new pod instead",
1717 request.pod_npub
1718 );
1719 warn!("{}", err_msg);
1720 nostr
1721 .send_error_response(
1722 requester_pubkey,
1723 "lease_expired",
1724 &err_msg,
1725 None,
1726 message_type,
1727 )
1728 .await?;
1729 return Ok(());
1730 }
1731
1732 let spec = match config.specs.iter().find(|s| s.id == spec_id) {
1733 Some(s) => s.clone(),
1734 None => {
1735 let err_msg = format!(
1739 "Pod {} references unknown spec `{}`; provider misconfiguration",
1740 request.pod_npub, spec_id
1741 );
1742 error!("{}", err_msg);
1743 nostr
1744 .send_error_response(
1745 requester_pubkey,
1746 "spec_unavailable",
1747 &err_msg,
1748 None,
1749 message_type,
1750 )
1751 .await?;
1752 return Ok(());
1753 }
1754 };
1755
1756 let payment_msats = match validate_and_redeem(
1758 redeemer,
1759 &config.whitelisted_mints,
1760 &request.cashu_token,
1761 )
1762 .await
1763 {
1764 Ok(v) => v,
1765 Err(e) => {
1766 let (error_type, err_msg) = redeem_error_to_response(&e);
1767 error!("Topup redemption failed: {}", err_msg);
1768 nostr
1769 .send_error_response(requester_pubkey, error_type, &err_msg, None, message_type)
1770 .await?;
1771 return Ok(());
1772 }
1773 };
1774
1775 let extension_secs = payment_msats / spec.rate_msats_per_sec;
1776 if extension_secs == 0 {
1777 let err_msg = format!(
1778 "Insufficient topup: {} msats buys 0 seconds at {} msats/sec",
1779 payment_msats, spec.rate_msats_per_sec
1780 );
1781 warn!("{}", err_msg);
1782 nostr
1783 .send_error_response(
1784 requester_pubkey,
1785 "insufficient_payment",
1786 &err_msg,
1787 None,
1788 message_type,
1789 )
1790 .await?;
1791 return Ok(());
1792 }
1793
1794 let new_expires_at = {
1798 let mut lock = workloads.lock().await;
1799 match lock.get_mut(&vmid) {
1800 Some(w) if w.owner_npub == requester_pubkey => {
1801 w.expires_at = w.expires_at.saturating_add(extension_secs);
1802 w.expires_at
1803 }
1804 _ => {
1805 drop(lock);
1811 let err_msg =
1812 "Pod was cleaned up before topup could be applied; token has been spent";
1813 error!("{}: vmid={}", err_msg, vmid);
1814 nostr
1815 .send_error_response(requester_pubkey, "race_lost", err_msg, None, message_type)
1816 .await?;
1817 return Ok(());
1818 }
1819 }
1820 };
1821
1822 let new_expires_dt =
1823 chrono::DateTime::from_timestamp(new_expires_at as i64, 0).unwrap_or_default();
1824 let response = TopUpResponseContent {
1825 success: true,
1826 pod_npub: request.pod_npub.clone(),
1827 extended_duration_seconds: extension_secs,
1828 new_expires_at: new_expires_dt.to_rfc3339(),
1829 message: format!(
1830 "Lease extended by {}s ({} msats @ {} msats/sec)",
1831 extension_secs, payment_msats, spec.rate_msats_per_sec
1832 ),
1833 };
1834
1835 nostr
1836 .send_topup_response_private_message(requester_pubkey, response, message_type)
1837 .await?;
1838
1839 info!(
1840 "Topup applied to {}: +{}s (now expires at {})",
1841 request.pod_npub, extension_secs, new_expires_at
1842 );
1843 Ok(())
1844}
1845
1846fn generate_password() -> String {
1851 use rand::Rng;
1852 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1853 let mut rng = rand::thread_rng();
1854 (0..16)
1855 .map(|_| {
1856 let idx = rng.gen_range(0..CHARSET.len());
1857 CHARSET[idx] as char
1858 })
1859 .collect()
1860}
1861
1862pub fn parse_pod_npub(pod_npub: &str) -> Option<u32> {
1867 if let Some(rest) = pod_npub.strip_prefix("container-") {
1868 rest.parse().ok()
1869 } else {
1870 pod_npub.parse().ok()
1871 }
1872}
1873
1874fn redeem_error_to_response(err: &RedeemError) -> (&'static str, String) {
1879 match err {
1880 RedeemError::InvalidToken(msg) => {
1881 ("invalid_token", format!("Invalid Cashu token: {}", msg))
1882 }
1883 RedeemError::NonWhitelistedMint { mint_url } => (
1884 "non_whitelisted_mint",
1885 format!("Mint {} is not accepted by this provider", mint_url),
1886 ),
1887 RedeemError::AlreadySpent => (
1888 "token_already_spent",
1889 "This Cashu token has already been spent at the mint".to_string(),
1890 ),
1891 RedeemError::Pending => (
1892 "token_pending",
1893 "Token is pending at the mint; retry shortly".to_string(),
1894 ),
1895 RedeemError::Network(msg) => (
1896 "mint_network_error",
1897 format!("Could not reach mint: {}", msg),
1898 ),
1899 RedeemError::UnsupportedUnit(unit) => (
1900 "unsupported_unit",
1901 format!("Token unit {} is not supported", unit),
1902 ),
1903 RedeemError::MintError(msg) => ("mint_error", format!("Mint rejected redemption: {}", msg)),
1904 }
1905}
1906
1907async fn handle_status_request(
1909 config: &ProviderConfig,
1910 nostr: &NostrRelaySubscriber,
1911 workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
1912 requester_pubkey: &str,
1913 message_type: &str,
1914 request: StatusRequestContent,
1915) -> Result<()> {
1916 info!(
1917 "Processing status request for pod {} from {}",
1918 request.pod_id, requester_pubkey
1919 );
1920
1921 let vmid = request.pod_id.parse::<u32>().ok();
1923
1924 let workload = {
1925 let lock = workloads.lock().await;
1926 if let Some(vmid) = vmid {
1927 lock.get(&vmid).cloned()
1928 } else {
1929 lock.values()
1931 .find(|w| w.owner_npub == request.pod_id || w.owner_npub == requester_pubkey)
1932 .cloned()
1933 }
1934 };
1935
1936 let workload = match workload {
1937 Some(w) => w,
1938 None => {
1939 let err_msg = format!(
1940 "Workload {} not found or you don't have access",
1941 request.pod_id
1942 );
1943 warn!("{}", err_msg);
1944 nostr
1945 .send_error_response(requester_pubkey, "not_found", &err_msg, None, message_type)
1946 .await?;
1947 return Ok(());
1948 }
1949 };
1950
1951 let now = std::time::SystemTime::now()
1953 .duration_since(std::time::UNIX_EPOCH)?
1954 .as_secs();
1955
1956 let time_remaining = workload.expires_at.saturating_sub(now);
1957 let status = if time_remaining == 0 {
1958 "Expired"
1959 } else {
1960 "Running"
1961 };
1962
1963 let expires_dt =
1964 chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
1965
1966 let spec = config.specs.iter().find(|s| s.id == workload.spec_id);
1968 let cpu = spec.map(|s| s.cpu_millicores).unwrap_or(1000);
1969 let mem = spec.map(|s| s.memory_mb).unwrap_or(1024);
1970 let host_port = match config.ssh_port_start {
1971 Some(start) => start + (workload.vmid - config.vmid_range_start) as u16,
1972 None => (30000 + (workload.vmid % 10000)) as u16,
1973 };
1974
1975 let response = StatusResponseContent {
1976 pod_id: workload.vmid.to_string(),
1977 status: status.to_string(),
1978 expires_at: expires_dt.to_rfc3339(),
1979 time_remaining_seconds: time_remaining,
1980 cpu_millicores: cpu,
1981 memory_mb: mem,
1982 ssh_host: config.public_ip.clone(),
1983 ssh_port: host_port,
1984 ssh_username: "root".to_string(),
1985 };
1986
1987 nostr
1988 .send_status_response(requester_pubkey, response, message_type)
1989 .await?;
1990
1991 info!("Status response sent for workload {}", workload.vmid);
1992 Ok(())
1993}
1994
1995pub fn load_config(path: &str) -> Result<ProviderConfig> {
1997 let content =
1998 std::fs::read_to_string(path).context(format!("Failed to read config file: {}", path))?;
1999
2000 serde_json::from_str(&content).context("Failed to parse provider config")
2001}
2002
2003pub fn save_config(path: &str, config: &ProviderConfig) -> Result<()> {
2005 let content = serde_json::to_string_pretty(config)?;
2006 std::fs::write(path, content).context(format!("Failed to write config file: {}", path))?;
2007 Ok(())
2008}
2009
2010const STANDBY_PROMOTION_DELAY_SECS: u64 = 30;
2020
2021fn compute_warm_standby_role(
2029 self_npub: &str,
2030 request: &EncryptedSpawnPodRequest,
2031) -> WarmStandbyRole {
2032 use crate::durable_workload::ReplicationMode;
2033 match request.replication.as_ref() {
2034 Some(ReplicationMode::WarmStandby { standby_providers }) => {
2035 let primary = request.primary_npub.as_deref().unwrap_or("");
2036 warm_standby_role(self_npub, primary, standby_providers)
2037 }
2038 _ => WarmStandbyRole::Primary,
2042 }
2043}
2044
2045fn schedule_standby_promotion(
2056 backend: Arc<dyn ComputeBackend>,
2057 workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
2058 state_machine: Arc<Mutex<WorkloadStateMachine>>,
2059 standby_slots: Arc<Mutex<HashMap<String, StandbySlot>>>,
2060 nostr: NostrRelaySubscriber,
2061 slot: StandbySlot,
2062) {
2063 let delay_secs = (slot.standby_index as u64).saturating_mul(STANDBY_PROMOTION_DELAY_SECS);
2064 let workload_id = slot.workload_id.clone();
2065 let standby_index = slot.standby_index;
2066 info!(
2067 "Scheduling standby promotion for workload {} after {}s backoff (standby index {})",
2068 workload_id, delay_secs, standby_index
2069 );
2070 tokio::spawn(async move {
2071 if delay_secs > 0 {
2072 tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await;
2073 }
2074
2075 let still_present = {
2082 let mut slots = standby_slots.lock().await;
2083 slots.remove(&workload_id)
2084 };
2085 let slot = match still_present {
2086 Some(s) => s,
2087 None => {
2088 debug!(
2089 "Standby slot for workload {} already drained; skipping promotion",
2090 workload_id
2091 );
2092 return;
2093 }
2094 };
2095
2096 if !slot.peer_standby_npubs.is_empty() {
2105 match nostr
2106 .query_standby_promotion_announcements(&slot.workload_id, &slot.peer_standby_npubs)
2107 .await
2108 {
2109 Ok(Some(announcement)) => {
2110 info!(
2111 "Peer standby {} already promoted workload {} at {}; dropping slot without spawning",
2112 announcement.new_primary_npub,
2113 announcement.workload_id,
2114 announcement.promoted_at
2115 );
2116 return;
2117 }
2118 Ok(None) => {
2119 }
2121 Err(e) => {
2122 warn!(
2123 "Failed to query peer promotion announcements for workload {}: {}; proceeding with promotion (best-effort)",
2124 slot.workload_id, e
2125 );
2126 }
2127 }
2128 }
2129
2130 info!(
2131 "Promoting standby slot {} → primary (vmid {})",
2132 slot.workload_id, slot.container_config.id
2133 );
2134 if let Err(e) = backend.create_container(&slot.container_config).await {
2135 error!(
2136 "Standby promotion failed for workload {}: backend error: {}",
2137 slot.workload_id, e
2138 );
2139 standby_slots
2144 .lock()
2145 .await
2146 .insert(slot.workload_id.clone(), slot);
2147 return;
2148 }
2149
2150 let now = std::time::SystemTime::now()
2151 .duration_since(std::time::UNIX_EPOCH)
2152 .map(|d| d.as_secs())
2153 .unwrap_or(0);
2154 let workload = WorkloadInfo {
2155 vmid: slot.container_config.id,
2156 workload_type: "lxc".to_string(),
2157 spec_id: slot.spec_id.clone(),
2158 created_at: now,
2159 expires_at: slot.expires_at,
2160 owner_npub: slot.owner_npub.clone(),
2161 consumer_workload_id: Some(slot.workload_id.clone()),
2162 replication: crate::durable_workload::ReplicationMode::None,
2168 restart_policy: crate::durable_workload::RestartPolicy::default(),
2169 state_uri: None,
2170 };
2171 let active_workloads_count = {
2172 let mut w = workloads.lock().await;
2173 w.insert(slot.container_config.id, workload.clone());
2174 w.len() as u32
2175 };
2176
2177 state_machine
2178 .lock()
2179 .await
2180 .track(crate::durable_workload::DurableWorkload {
2181 workload_id: slot.container_config.id,
2182 provider_npub: String::new(), state: crate::durable_workload::WorkloadState::Provisioning { since: now },
2184 replication: workload.replication.clone(),
2185 restart_policy: workload.restart_policy,
2186 state_uri: workload.state_uri.clone(),
2187 created_at: now,
2188 expires_at: workload.expires_at,
2189 });
2190
2191 info!(
2192 "Standby promotion complete: workload {} now running locally (vmid {})",
2193 slot.workload_id, slot.container_config.id
2194 );
2195 let _ = active_workloads_count; let announcement = StandbyPromotionAnnouncementContent {
2206 workload_id: slot.workload_id.clone(),
2207 new_primary_npub: nostr.get_service_public_key(),
2208 promoted_at: now,
2209 version: crate::nostr::SCHEMA_VERSION,
2210 };
2211 if let Err(e) = nostr
2212 .publish_standby_promotion_announcement(announcement)
2213 .await
2214 {
2215 warn!(
2216 "Post-promotion announcement publish failed for workload {}: {}; peer standbys will not back off and may produce a duplicate primary",
2217 slot.workload_id, e
2218 );
2219 }
2220 });
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225 use super::*;
2226 use crate::durable_workload::ReplicationMode;
2227
2228 fn req_with(
2229 replication: Option<ReplicationMode>,
2230 primary_npub: Option<&str>,
2231 ) -> EncryptedSpawnPodRequest {
2232 EncryptedSpawnPodRequest {
2233 cashu_token: "tok".to_string(),
2234 pod_spec_id: Some("basic".to_string()),
2235 pod_image: "ubuntu:22.04".to_string(),
2236 ssh_username: "u".to_string(),
2237 ssh_password: "p".to_string(),
2238 template_slug: None,
2239 replication,
2240 primary_npub: primary_npub.map(|s| s.to_string()),
2241 workload_id: Some("wid-test".to_string()),
2242 volume_encryption: None,
2243 }
2244 }
2245
2246 #[test]
2247 fn role_is_primary_for_non_warm_standby() {
2248 let r = compute_warm_standby_role("npub1self", &req_with(None, None));
2251 assert_eq!(r, WarmStandbyRole::Primary);
2252
2253 let r = compute_warm_standby_role(
2254 "npub1self",
2255 &req_with(Some(ReplicationMode::Checkpointed), None),
2256 );
2257 assert_eq!(r, WarmStandbyRole::Primary);
2258 }
2259
2260 #[test]
2261 fn role_is_primary_when_self_is_designated_primary() {
2262 let r = compute_warm_standby_role(
2263 "npub1primary",
2264 &req_with(
2265 Some(ReplicationMode::WarmStandby {
2266 standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
2267 }),
2268 Some("npub1primary"),
2269 ),
2270 );
2271 assert_eq!(r, WarmStandbyRole::Primary);
2272 }
2273
2274 #[test]
2275 fn role_is_standby_with_correct_index_when_self_in_list() {
2276 let r = compute_warm_standby_role(
2277 "npub1c",
2278 &req_with(
2279 Some(ReplicationMode::WarmStandby {
2280 standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
2281 }),
2282 Some("npub1primary"),
2283 ),
2284 );
2285 assert_eq!(r, WarmStandbyRole::Standby { index: 1, count: 2 });
2286 }
2287
2288 #[test]
2289 fn role_is_not_addressed_when_self_unknown_to_topology() {
2290 let r = compute_warm_standby_role(
2291 "npub1stranger",
2292 &req_with(
2293 Some(ReplicationMode::WarmStandby {
2294 standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
2295 }),
2296 Some("npub1primary"),
2297 ),
2298 );
2299 assert_eq!(r, WarmStandbyRole::NotAddressed);
2300 }
2301
2302 #[test]
2311 fn fresh_primary_heartbeat_is_not_silent() {
2312 assert!(!primary_is_silent(1_000_000, 999_940, 180));
2314 }
2315
2316 #[test]
2317 fn primary_just_past_threshold_is_silent() {
2318 assert!(primary_is_silent(1_000_000, 999_820, 180));
2320 assert!(!primary_is_silent(1_000_000, 999_821, 180));
2322 }
2323
2324 #[test]
2325 fn unset_baseline_is_not_silent() {
2326 assert!(!primary_is_silent(1_000_000, 0, 180));
2334 assert!(!primary_is_silent(50, 0, 180));
2335 }
2336
2337 #[test]
2338 fn fresh_slot_within_grace_window_is_not_silent() {
2339 let created_at = 1_000_000;
2344 let now = created_at + 30;
2345 assert!(!primary_is_silent(now, created_at, 180));
2346 }
2347
2348 #[test]
2349 fn fresh_slot_past_grace_window_is_silent() {
2350 let created_at = 1_000_000;
2355 let now = created_at + 180;
2356 assert!(primary_is_silent(now, created_at, 180));
2357 }
2358
2359 #[test]
2360 fn clock_skew_underflow_does_not_panic_or_misfire() {
2361 assert!(!primary_is_silent(100, 200, 180));
2366 }
2367
2368 fn make_slot(workload_id: &str, expires_at: u64) -> StandbySlot {
2371 StandbySlot {
2372 workload_id: workload_id.to_string(),
2373 primary_npub: "npub1primary".to_string(),
2374 standby_index: 0,
2375 standby_count: 1,
2376 container_config: ContainerConfig {
2377 id: 1,
2378 name: "test".to_string(),
2379 image: "img".to_string(),
2380 cpu_cores: 1,
2381 memory_mb: 1024,
2382 storage_gb: 10,
2383 password: "p".to_string(),
2384 ssh_key: None,
2385 host_port: None,
2386 template_ports: vec![],
2387 template_env: HashMap::new(),
2388 extra_runtime_args: vec![],
2389 data_path: None,
2390 volume_encryption_key: None,
2391 },
2392 spec_id: "basic".to_string(),
2393 expires_at,
2394 owner_npub: "npub1owner".to_string(),
2395 created_at: 0,
2396 peer_standby_npubs: vec![],
2397 }
2398 }
2399
2400 fn select_expired(slots: &HashMap<String, StandbySlot>, now: u64) -> Vec<String> {
2401 slots
2402 .iter()
2403 .filter(|(_, slot)| slot.expires_at <= now)
2404 .map(|(workload_id, _)| workload_id.clone())
2405 .collect()
2406 }
2407
2408 #[test]
2409 fn select_expired_returns_only_past_expiry_slots() {
2410 let mut slots = HashMap::new();
2411 slots.insert("active".to_string(), make_slot("active", 2_000));
2412 slots.insert("expired".to_string(), make_slot("expired", 999));
2413 let mut expired = select_expired(&slots, 1_000);
2414 expired.sort();
2415 assert_eq!(expired, vec!["expired".to_string()]);
2416 }
2417
2418 #[test]
2419 fn select_expired_treats_expires_at_equals_now_as_expired() {
2420 let mut slots = HashMap::new();
2424 slots.insert("boundary".to_string(), make_slot("boundary", 1_000));
2425 let expired = select_expired(&slots, 1_000);
2426 assert_eq!(expired, vec!["boundary".to_string()]);
2427 }
2428
2429 #[test]
2430 fn select_expired_returns_empty_when_no_slots_expired() {
2431 let mut slots = HashMap::new();
2432 slots.insert("a".to_string(), make_slot("a", 9_999));
2433 slots.insert("b".to_string(), make_slot("b", 9_999));
2434 assert!(select_expired(&slots, 1_000).is_empty());
2435 }
2436}