use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use crate::cashu::{
derive_seed_from_nostr_key, validate_and_redeem, CdkRedeemer, MintRedeemer, RedeemError,
};
use crate::compute::{ComputeBackend, ContainerConfig, PortMapping};
use crate::docker::DockerBackend;
use crate::durable_workload::{
DurableWorkload, HeartbeatObservation, QuorumConfig, StateMachineEvent, WorkloadState,
WorkloadStateMachine,
};
use crate::lxd::LxdBackend;
use crate::nostr::{
parse_private_message_content, warm_standby_role, AccessDetailsContent, CapacityInfo,
EncryptedSpawnPodRequest, EncryptedTopUpPodRequest, ErrorResponseContent, HeartbeatContent,
LeaseRevocationContent, NostrRelaySubscriber, PodSpec, PrivateRequest, ProviderOfferContent,
RelayConfig, StandbyPromotionAnnouncementContent, StatusRequestContent, StatusResponseContent,
TopUpResponseContent, WarmStandbyRole,
};
use crate::proxmox::{ProxmoxBackend, ProxmoxClient};
use crate::templates::{TemplateDefinition, TemplateName};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackendType {
Proxmox,
LXD,
Docker,
Kvm,
}
impl Default for BackendType {
fn default() -> Self {
Self::Proxmox
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderConfig {
#[serde(default)]
pub backend_type: BackendType,
pub proxmox_url: String,
pub proxmox_token_id: String,
pub proxmox_token_secret: String,
pub proxmox_node: String,
pub proxmox_storage: String,
pub proxmox_template: String,
pub proxmox_bridge: String,
pub vmid_range_start: u32,
pub vmid_range_end: u32,
pub nostr_private_key: String,
pub nostr_relays: Vec<String>,
pub provider_name: String,
pub provider_location: Option<String>,
pub public_ip: String,
pub capabilities: Vec<String>,
pub specs: Vec<PodSpec>,
pub whitelisted_mints: Vec<String>,
pub heartbeat_interval_secs: u64,
pub minimum_duration_seconds: u64,
#[serde(default)]
pub tunnel_enabled: bool,
#[serde(default)]
pub tunnel_interface: Option<String>,
#[serde(default)]
pub ssh_port_start: Option<u16>,
#[serde(default)]
pub ssh_port_end: Option<u16>,
#[serde(default = "default_cashu_wallet_db_path")]
pub cashu_wallet_db_path: String,
}
fn default_cashu_wallet_db_path() -> String {
"./paygress-cashu-wallet.redb".to_string()
}
impl Default for ProviderConfig {
fn default() -> Self {
Self {
backend_type: BackendType::Proxmox,
proxmox_url: "https://localhost:8006/api2/json".to_string(),
proxmox_token_id: "root@pam!paygress".to_string(),
proxmox_token_secret: String::new(),
proxmox_node: "pve".to_string(),
proxmox_storage: "local-lvm".to_string(),
proxmox_template: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
proxmox_bridge: "vmbr0".to_string(),
vmid_range_start: 1000,
vmid_range_end: 1999,
nostr_private_key: String::new(),
nostr_relays: vec![
"wss://relay.damus.io".to_string(),
"wss://nos.lol".to_string(),
],
provider_name: "Paygress Provider".to_string(),
provider_location: None,
public_ip: "127.0.0.1".to_string(),
capabilities: vec!["lxc".to_string()],
specs: vec![PodSpec {
id: "basic".to_string(),
name: "Basic".to_string(),
description: "1 vCPU, 1GB RAM".to_string(),
cpu_millicores: 1000,
memory_mb: 1024,
rate_msats_per_sec: 50,
}],
whitelisted_mints: vec!["https://mint.minibits.cash".to_string()],
heartbeat_interval_secs: 60,
minimum_duration_seconds: 60,
tunnel_enabled: false,
tunnel_interface: None,
ssh_port_start: None,
ssh_port_end: None,
cashu_wallet_db_path: default_cashu_wallet_db_path(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct WorkloadInfo {
pub vmid: u32,
pub workload_type: String, pub spec_id: String,
pub created_at: u64,
pub expires_at: u64,
pub owner_npub: String,
#[serde(default)]
pub replication: crate::durable_workload::ReplicationMode,
#[serde(default)]
pub restart_policy: crate::durable_workload::RestartPolicy,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub consumer_workload_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct StandbySlot {
pub workload_id: String,
pub primary_npub: String,
pub standby_index: usize,
pub standby_count: usize,
pub container_config: ContainerConfig,
pub spec_id: String,
pub expires_at: u64,
pub owner_npub: String,
pub created_at: u64,
pub peer_standby_npubs: Vec<String>,
}
pub struct ProviderService {
config: ProviderConfig,
backend: Arc<dyn ComputeBackend>,
nostr: NostrRelaySubscriber,
redeemer: Arc<dyn MintRedeemer>,
active_workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
stats: Arc<Mutex<ProviderStats>>,
state_machine: Arc<Mutex<WorkloadStateMachine>>,
observation_buffer: Arc<Mutex<Vec<HeartbeatObservation>>>,
standby_slots: Arc<Mutex<HashMap<String, StandbySlot>>>,
}
#[derive(Debug, Clone, Default)]
struct ProviderStats {
total_jobs_completed: u64,
uptime_start: u64,
}
impl ProviderService {
pub async fn new(config: ProviderConfig) -> Result<Self> {
let backend: Arc<dyn ComputeBackend> = match config.backend_type {
BackendType::Proxmox => {
let client = ProxmoxClient::new(
&config.proxmox_url,
&config.proxmox_token_id,
&config.proxmox_token_secret,
&config.proxmox_node,
)?;
Arc::new(ProxmoxBackend::new(
client,
&config.proxmox_storage,
&config.proxmox_bridge,
&config.proxmox_template,
))
}
BackendType::LXD => Arc::new(LxdBackend::new(
&config.proxmox_storage, &config.proxmox_bridge, )),
BackendType::Docker => Arc::new(DockerBackend::new()),
BackendType::Kvm => {
if let Err(e) = crate::kvm::KvmBackend::check_kvm_available().await {
tracing::error!("KVM backend selected but unavailable: {}", e);
anyhow::bail!("KVM backend unavailable: {}", e);
}
Arc::new(crate::kvm::KvmBackend::new(crate::kvm::KvmConfig::default()))
}
};
let relay_config = RelayConfig {
relays: config.nostr_relays.clone(),
private_key: Some(config.nostr_private_key.clone()),
};
let nostr = NostrRelaySubscriber::new(relay_config).await?;
let wallet_db = cdk_redb::wallet::WalletRedbDatabase::new(std::path::Path::new(
&config.cashu_wallet_db_path,
))
.map_err(|e| {
anyhow::anyhow!(
"failed to open cashu wallet database at {}: {}",
config.cashu_wallet_db_path,
e
)
})?;
let seed = derive_seed_from_nostr_key(&config.nostr_private_key);
let redeemer: Arc<dyn MintRedeemer> = Arc::new(CdkRedeemer::new(Arc::new(wallet_db), seed));
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
Ok(Self {
config,
backend,
nostr,
redeemer,
active_workloads: Arc::new(Mutex::new(HashMap::new())),
stats: Arc::new(Mutex::new(ProviderStats {
total_jobs_completed: 0,
uptime_start: now,
})),
state_machine: Arc::new(Mutex::new(WorkloadStateMachine::new(
QuorumConfig::default(),
))),
observation_buffer: Arc::new(Mutex::new(Vec::new())),
standby_slots: Arc::new(Mutex::new(HashMap::new())),
})
}
pub fn get_npub(&self) -> String {
self.nostr.get_service_public_key()
}
pub async fn run(&self) -> Result<()> {
info!("🚀 Starting Paygress Provider Service");
info!("Provider: {}", self.config.provider_name);
info!("NPUB: {}", self.get_npub());
self.publish_offer().await?;
tokio::select! {
result = self.heartbeat_loop() => {
error!("Heartbeat loop exited: {:?}", result);
result
}
result = self.listen_for_requests() => {
error!("Request listener exited: {:?}", result);
result
}
result = self.cleanup_loop() => {
error!("Cleanup loop exited: {:?}", result);
result
}
result = self.orchestrator_loop() => {
error!("Orchestrator loop exited: {:?}", result);
result
}
result = self.standby_watchdog_loop() => {
error!("Standby watchdog loop exited: {:?}", result);
result
}
}
}
async fn publish_offer(&self) -> Result<()> {
let stats = self.stats.lock().await;
let offer = ProviderOfferContent {
provider_npub: self.get_npub(),
hostname: self.config.provider_name.clone(),
location: self.config.provider_location.clone(),
capabilities: self.config.capabilities.clone(),
specs: self.config.specs.clone(),
whitelisted_mints: self.config.whitelisted_mints.clone(),
uptime_percent: 100.0, total_jobs_completed: stats.total_jobs_completed,
api_endpoint: None, version: crate::nostr::SCHEMA_VERSION,
isolation_level: match self.config.backend_type {
BackendType::Kvm => crate::nostr::IsolationLevel::DedicatedHost,
BackendType::Proxmox | BackendType::LXD | BackendType::Docker => {
crate::nostr::IsolationLevel::SharedKernel
}
},
stake_proof: None,
};
self.nostr.publish_provider_offer(offer).await?;
Ok(())
}
async fn heartbeat_loop(&self) -> Result<()> {
let interval = tokio::time::Duration::from_secs(self.config.heartbeat_interval_secs);
loop {
if let Err(e) = self.send_heartbeat().await {
warn!("Failed to send heartbeat: {}", e);
}
tokio::time::sleep(interval).await;
}
}
async fn send_heartbeat(&self) -> Result<()> {
let workloads = self.active_workloads.lock().await;
let capacity = match self.backend.get_node_status().await {
Ok(status) => CapacityInfo {
cpu_available: ((1.0 - status.cpu_usage) * 100000.0) as u64, memory_mb_available: status.memory_total.saturating_sub(status.memory_used)
/ (1024 * 1024),
storage_gb_available: status.disk_total.saturating_sub(status.disk_used)
/ (1024 * 1024 * 1024),
},
Err(e) => {
warn!("Failed to get node status: {}", e);
CapacityInfo {
cpu_available: 0,
memory_mb_available: 0,
storage_gb_available: 0,
}
}
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let heartbeat = HeartbeatContent {
provider_npub: self.get_npub(),
timestamp: now,
active_workloads: workloads.len() as u32,
available_capacity: capacity,
version: crate::nostr::SCHEMA_VERSION,
};
let (_event_id, accepting_relays) = self.nostr.publish_heartbeat(heartbeat).await?;
if !accepting_relays.is_empty() {
let provider_npub = self.get_npub();
let mut buf = self.observation_buffer.lock().await;
for relay_url in accepting_relays {
buf.push(HeartbeatObservation {
provider_npub: provider_npub.clone(),
relay_url,
seen_at: now,
event_timestamp: now,
});
}
}
Ok(())
}
async fn listen_for_requests(&self) -> Result<()> {
info!("Listening for Paygress requests...");
let backend = self.backend.clone();
let config = self.config.clone();
let nostr = self.nostr.clone();
let redeemer = self.redeemer.clone();
let workloads = self.active_workloads.clone();
let stats = self.stats.clone();
let state_machine = self.state_machine.clone();
let standby_slots = self.standby_slots.clone();
self.nostr
.subscribe_to_pod_events(move |event| {
let backend = backend.clone();
let config = config.clone();
let nostr = nostr.clone();
let redeemer = redeemer.clone();
let workloads = workloads.clone();
let stats = stats.clone();
let state_machine = state_machine.clone();
let standby_slots = standby_slots.clone();
Box::pin(async move {
let my_pubkey = nostr.public_key().to_hex();
if event.pubkey == my_pubkey {
return Ok(());
}
debug!(
"Handler received event kind: {}, from: {}, message_type: {}",
event.kind, event.pubkey, event.message_type
);
if let Some(revocation) = crate::nostr::parse_revocation_event(&event) {
info!(
"Lease revocation observed: workload_id={}, primary={}, reason={}, state_uri={:?}, standbys={:?}",
revocation.workload_id,
revocation.primary_provider_npub,
revocation.reason,
revocation.state_uri,
revocation.standby_providers,
);
let workload_id = revocation.workload_id.clone();
let primary_npub = revocation.primary_provider_npub.clone();
let slot_opt = standby_slots.lock().await.get(&workload_id).cloned();
if let Some(slot) = slot_opt {
if slot.primary_npub != primary_npub {
warn!(
"Revocation primary_npub ({}) does not match slot's primary ({}); ignoring",
primary_npub, slot.primary_npub
);
return Ok(());
}
schedule_standby_promotion(
backend.clone(),
workloads.clone(),
state_machine.clone(),
standby_slots.clone(),
nostr.clone(),
slot,
);
} else {
debug!(
"Revocation workload_id={} did not match any local standby slot; ignoring",
workload_id
);
}
return Ok(());
}
let request_type = match parse_private_message_content(&event.content) {
Ok(req) => req,
Err(e) => {
warn!("Failed to parse request from {}: {}", event.pubkey, e);
let error = ErrorResponseContent {
error_type: "invalid_request".to_string(),
message: "Failed to parse request".to_string(),
details: Some(e.to_string()),
};
let _ = nostr
.send_error_response_private_message(
&event.pubkey,
error,
&event.message_type,
)
.await;
return Ok(());
}
};
debug!("Successfully parsed request metadata");
match request_type {
PrivateRequest::Spawn(spawn_req) => {
if let Err(e) = handle_spawn_request(
backend.as_ref(),
&config,
&nostr,
redeemer.as_ref(),
&workloads,
&stats,
&state_machine,
&standby_slots,
&event.pubkey,
&event.message_type,
spawn_req,
)
.await
{
error!("Failed to handle spawn request: {}", e);
}
}
PrivateRequest::Status(status_req) => {
if let Err(e) = handle_status_request(
&config,
&nostr,
&workloads,
&event.pubkey,
&event.message_type,
status_req,
)
.await
{
error!("Failed to handle status request: {}", e);
}
}
PrivateRequest::TopUp(topup_req) => {
if let Err(e) = handle_topup_request(
&config,
&nostr,
redeemer.as_ref(),
&workloads,
&event.pubkey,
&event.message_type,
topup_req,
)
.await
{
error!("Failed to handle topup request: {}", e);
}
}
}
Ok(())
})
})
.await?;
Ok(())
}
async fn orchestrator_loop(&self) -> Result<()> {
let interval = tokio::time::Duration::from_secs(15);
info!("Orchestrator loop starting (cadence: 15s)");
loop {
tokio::time::sleep(interval).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let observations: Vec<HeartbeatObservation> = {
let mut buf = self.observation_buffer.lock().await;
std::mem::take(&mut *buf)
};
let events = {
let mut sm = self.state_machine.lock().await;
sm.tick(now, &observations)
};
if events.is_empty() {
continue;
}
for event in events {
self.handle_state_machine_event(event, now).await;
}
}
}
async fn handle_state_machine_event(&self, event: StateMachineEvent, now: u64) {
match event {
StateMachineEvent::EnteredLive { workload_id } => {
info!("Workload {} entered Live", workload_id);
}
StateMachineEvent::EnteredSuspect { workload_id } => {
warn!(
"Workload {} entered Suspect (heartbeat quorum lost)",
workload_id
);
}
StateMachineEvent::Evicted {
workload_id,
reason,
} => {
error!("Workload {} evicted: {}", workload_id, reason);
}
StateMachineEvent::PublishLeaseRevocation {
workload_id,
standby_providers,
} => {
let (consumer_workload_id, state_uri) = {
let lock = self.active_workloads.lock().await;
let entry = lock.get(&workload_id);
let cid = entry
.and_then(|w| w.consumer_workload_id.clone())
.unwrap_or_else(|| format!("vmid-{}", workload_id));
let suri = entry.and_then(|w| w.state_uri.clone());
(cid, suri)
};
let revocation = LeaseRevocationContent {
workload_id: consumer_workload_id.clone(),
primary_provider_npub: self.get_npub(),
standby_providers: standby_providers.clone(),
reason: "heartbeat-quorum-lost-past-t2".to_string(),
revoked_at: now,
state_uri,
version: crate::nostr::SCHEMA_VERSION,
};
match self.nostr.publish_lease_revocation(revocation).await {
Ok(event_id) => info!(
"Published lease revocation for workload {} (vmid {}) to {} standby(s): {}",
consumer_workload_id,
workload_id,
standby_providers.len(),
event_id
),
Err(e) => error!(
"Failed to publish lease revocation for workload {}: {}",
workload_id, e
),
}
}
StateMachineEvent::AttemptRespawn {
workload_id,
attempt,
} => {
info!(
"Attempting respawn of workload {} (attempt {})",
workload_id, attempt
);
let mut sm = self.state_machine.lock().await;
sm.notify_respawn_failed(
workload_id,
"respawn handler not yet implemented (follow-up)",
);
}
StateMachineEvent::Failed {
workload_id,
reason,
} => {
error!("Workload {} marked Failed: {}", workload_id, reason);
let mut wl = self.active_workloads.lock().await;
wl.remove(&workload_id);
}
}
}
async fn cleanup_loop(&self) -> Result<()> {
let interval = tokio::time::Duration::from_secs(30);
loop {
tokio::time::sleep(interval).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let mut workloads = self.active_workloads.lock().await;
let expired: Vec<u32> = workloads
.iter()
.filter(|(_, w)| w.expires_at <= now)
.map(|(vmid, _)| *vmid)
.collect();
for vmid in expired {
info!("Cleaning up expired workload: {}", vmid);
if let Some(_workload) = workloads.remove(&vmid) {
let stop_result = self.backend.stop_container(vmid).await;
let result = match stop_result {
Ok(_) => self.backend.delete_container(vmid).await,
Err(e) => Err(e),
};
self.state_machine.lock().await.untrack(vmid);
match result {
Ok(_) => {
info!("Cleaned up workload {}", vmid);
let mut stats = self.stats.lock().await;
stats.total_jobs_completed += 1;
}
Err(e) => error!("Failed to cleanup workload {}: {}", vmid, e),
}
}
}
drop(workloads);
let mut slots = self.standby_slots.lock().await;
let expired_slots: Vec<String> = slots
.iter()
.filter(|(_, slot)| slot.expires_at <= now)
.map(|(workload_id, _)| workload_id.clone())
.collect();
for workload_id in expired_slots {
if let Some(slot) = slots.remove(&workload_id) {
info!(
"Expiring standby slot for workload {} (index {}/{}, primary {}, expired at {})",
workload_id, slot.standby_index, slot.standby_count, slot.primary_npub, slot.expires_at
);
}
}
}
}
async fn standby_watchdog_loop(&self) -> Result<()> {
let interval = tokio::time::Duration::from_secs(STANDBY_WATCHDOG_INTERVAL_SECS);
info!(
"Standby watchdog loop starting (cadence: {}s, silence threshold: {}s)",
STANDBY_WATCHDOG_INTERVAL_SECS, STANDBY_HEARTBEAT_SILENCE_SECS
);
loop {
tokio::time::sleep(interval).await;
let slots: Vec<StandbySlot> = {
let lock = self.standby_slots.lock().await;
lock.values().cloned().collect()
};
if slots.is_empty() {
continue;
}
let primary_npubs: Vec<String> = slots
.iter()
.map(|s| s.primary_npub.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let heartbeats = match self.nostr.get_latest_heartbeats_multi(primary_npubs).await {
Ok(hb) => hb,
Err(e) => {
warn!(
"standby watchdog: heartbeat batch query failed: {}; \
skipping this tick (will retry next interval)",
e
);
continue;
}
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
for slot in slots {
let last_seen = heartbeats
.get(&slot.primary_npub)
.map(|hb| hb.timestamp)
.unwrap_or(0);
let silence_baseline = if last_seen == 0 {
slot.created_at
} else {
last_seen
};
if !primary_is_silent(now, silence_baseline, STANDBY_HEARTBEAT_SILENCE_SECS) {
continue;
}
let silence_secs = now.saturating_sub(silence_baseline);
warn!(
"Primary {} silent for {}s on slot workload_id={} (threshold {}s); \
triggering standby promotion (assumes hard crash; deduped \
intra-process via slot.remove and inter-process via the \
promotion-announcement event published post-spawn)",
slot.primary_npub,
silence_secs,
slot.workload_id,
STANDBY_HEARTBEAT_SILENCE_SECS
);
schedule_standby_promotion(
self.backend.clone(),
self.active_workloads.clone(),
self.state_machine.clone(),
self.standby_slots.clone(),
self.nostr.clone(),
slot,
);
}
}
}
}
const STANDBY_WATCHDOG_INTERVAL_SECS: u64 = 30;
const STANDBY_HEARTBEAT_SILENCE_SECS: u64 = 180;
fn primary_is_silent(now: u64, baseline: u64, threshold: u64) -> bool {
if baseline == 0 {
return false;
}
now.saturating_sub(baseline) >= threshold
}
async fn handle_spawn_request(
backend: &dyn ComputeBackend,
config: &ProviderConfig,
nostr: &NostrRelaySubscriber,
redeemer: &dyn MintRedeemer,
workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
stats: &Arc<Mutex<ProviderStats>>,
state_machine: &Arc<Mutex<WorkloadStateMachine>>,
standby_slots: &Arc<Mutex<HashMap<String, StandbySlot>>>,
requester_pubkey: &str,
message_type: &str,
request: EncryptedSpawnPodRequest,
) -> Result<()> {
info!(
"Processing spawn request from {} (tier: {:?})",
requester_pubkey, request.pod_spec_id
);
let role = compute_warm_standby_role(&nostr.get_service_public_key(), &request);
if matches!(role, WarmStandbyRole::NotAddressed) {
if request
.replication
.as_ref()
.map(|r| {
matches!(
r,
crate::durable_workload::ReplicationMode::WarmStandby { .. }
)
})
.unwrap_or(false)
{
let err_msg =
"warm-standby spawn arrived at a provider not designated as primary or standby";
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"not_addressed",
err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
}
let payment_msats = match validate_and_redeem(
redeemer,
&config.whitelisted_mints,
&request.cashu_token,
)
.await
{
Ok(v) => v,
Err(e) => {
let (error_type, err_msg) = redeem_error_to_response(&e);
error!("Cashu redemption failed: {}", err_msg);
nostr
.send_error_response(requester_pubkey, error_type, &err_msg, None, message_type)
.await?;
return Ok(());
}
};
let spec = match config
.specs
.iter()
.find(|s| Some(s.id.clone()) == request.pod_spec_id)
{
Some(s) => s,
None => {
if let Some(s) = config.specs.first() {
s
} else {
let err_msg = "No pod specifications available on this provider";
error!("{}", err_msg);
nostr
.send_error_response(requester_pubkey, "no_specs", err_msg, None, message_type)
.await?;
return Ok(());
}
}
};
let duration_secs = payment_msats / spec.rate_msats_per_sec;
if duration_secs < config.minimum_duration_seconds {
let err_msg = format!(
"Insufficient payment for minimum duration. Required: {} msats for {}s",
config.minimum_duration_seconds * spec.rate_msats_per_sec,
config.minimum_duration_seconds
);
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"insufficient_payment",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
info!(
"Validated payment: {} msats for {}s on tier {}",
payment_msats, duration_secs, spec.name
);
let id = match backend
.find_available_id(config.vmid_range_start, config.vmid_range_end)
.await
{
Ok(id) => id,
Err(e) => {
let err_msg = format!("Failed to find available ID: {}", e);
error!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"provisioning_error",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
};
let password = generate_password();
let host_port = match config.ssh_port_start {
Some(start) => start + (id - config.vmid_range_start) as u16,
None => 30000 + (id % 10000) as u16,
};
let template = if let Some(slug) = request.template_slug.as_deref() {
match TemplateName::from_slug(slug) {
Some(name) => Some(TemplateDefinition::lookup(name)),
None => {
let err_msg = format!(
"Unknown template `{}` — provider does not advertise it",
slug
);
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"unknown_template",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
}
} else {
None
};
let image = template
.as_ref()
.map(|t| t.image.to_string())
.unwrap_or_else(|| request.pod_image.clone());
let template_ports: Vec<PortMapping> = template
.as_ref()
.map(|t| {
t.ports
.iter()
.enumerate()
.map(|(i, p)| PortMapping {
host_port: host_port.saturating_add(1 + i as u16),
container_port: p.container_port,
protocol: "tcp",
})
.collect()
})
.unwrap_or_default();
let mut template_env: HashMap<String, String> = template
.as_ref()
.map(|t| {
t.env
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
})
.unwrap_or_default();
if let Some(t) = template.as_ref() {
if t.env.contains_key("EXEC_USER") {
template_env.insert("EXEC_USER".to_string(), "root".to_string());
}
if t.env.contains_key("EXEC_PASS") {
template_env.insert("EXEC_PASS".to_string(), password.clone());
}
}
let extra_runtime_args: Vec<String> = template
.as_ref()
.map(|t| t.extra_docker_args.iter().map(|s| s.to_string()).collect())
.unwrap_or_default();
let data_path: Option<String> = template
.as_ref()
.and_then(|t| t.data_path.map(|p| p.to_string()));
let volume_encryption_key = match (&data_path, request.volume_encryption.as_ref()) {
(Some(_), Some(ve)) => match ve.decoded_key() {
Ok(key) => {
info!(
"Spawn request includes volume_encryption (algorithm={}, version={}); will create LUKS-encrypted data volume",
ve.algorithm, ve.version
);
Some(key)
}
Err(e) => {
error!(
"Rejecting spawn: malformed volume_encryption.key_b64: {}",
e
);
let err_payload = ErrorResponseContent {
error_type: "invalid_volume_encryption".to_string(),
message: format!("volume_encryption rejected: {}", e),
details: None,
};
let _ = nostr
.send_error_response_private_message(
requester_pubkey,
err_payload,
message_type,
)
.await;
return Ok(());
}
},
(None, Some(_)) => {
warn!(
"Spawn request set volume_encryption but template has no data_path; encryption is a no-op for stateless workloads"
);
None
}
_ => None,
};
let container_config = ContainerConfig {
id,
name: format!("paygress-{}", id),
image,
cpu_cores: (spec.cpu_millicores / 1000).max(1) as u32,
memory_mb: spec.memory_mb as u32,
storage_gb: 10, password: password.clone(),
ssh_key: None,
host_port: Some(host_port),
template_ports,
template_env,
extra_runtime_args,
data_path,
volume_encryption_key,
};
if let WarmStandbyRole::Standby { index, count } = role {
let workload_id = match request.workload_id.as_deref() {
Some(id) if !id.is_empty() => id.to_string(),
_ => {
let err_msg = "warm-standby spawn missing workload_id (consumer-assigned UUID required to coordinate primary + standbys)";
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"missing_workload_id",
err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
};
let primary_npub = request.primary_npub.clone().unwrap_or_default();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let self_npub = nostr.get_service_public_key();
let peer_standby_npubs: Vec<String> = match request.replication.as_ref() {
Some(crate::durable_workload::ReplicationMode::WarmStandby { standby_providers }) => {
standby_providers
.iter()
.filter(|p| !crate::nostr::npubs_equal(p, &self_npub))
.cloned()
.collect()
}
_ => Vec::new(),
};
let slot = StandbySlot {
workload_id: workload_id.clone(),
primary_npub,
standby_index: index,
standby_count: count,
container_config: container_config.clone(),
spec_id: spec.id.clone(),
expires_at: now + duration_secs,
owner_npub: requester_pubkey.to_string(),
created_at: now,
peer_standby_npubs,
};
info!(
"Reserved standby slot for workload_id={} (index {}/{}, expires at {})",
workload_id, index, count, slot.expires_at
);
standby_slots.lock().await.insert(workload_id.clone(), slot);
let expires_dt =
chrono::DateTime::from_timestamp((now + duration_secs) as i64, 0).unwrap_or_default();
let details = AccessDetailsContent {
pod_npub: format!("standby-slot-{}", workload_id),
node_port: 0, expires_at: expires_dt.to_rfc3339(),
cpu_millicores: spec.cpu_millicores,
memory_mb: spec.memory_mb,
pod_spec_name: spec.name.clone(),
pod_spec_description: spec.description.clone(),
instructions: vec![
format!(
"🛏️ Standby slot reserved (index {}/{} for workload {}).",
index, count, workload_id
),
format!(
"Will promote on LeaseRevocation event from primary {}.",
request.primary_npub.as_deref().unwrap_or("(unset)")
),
format!(
"Expected promotion delay: {} seconds (index * 30s backoff).",
index * 30
),
],
host_address: config.public_ip.clone(),
template_ports: Vec::new(),
};
nostr
.send_access_details_private_message(requester_pubkey, details, message_type)
.await?;
return Ok(());
}
debug!("Calling backend.create_container for workload {}", id);
if let Err(e) = backend.create_container(&container_config).await {
let err_msg = format!("Backend failed to create workload: {}", e);
error!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"backend_error",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
debug!("Successfully created container {}", id);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let replication = request
.replication
.clone()
.unwrap_or_else(crate::durable_workload::ReplicationMode::default);
let workload = WorkloadInfo {
vmid: id,
workload_type: "lxc".to_string(), spec_id: spec.id.clone(),
created_at: now,
expires_at: now + duration_secs,
owner_npub: requester_pubkey.to_string(),
replication,
restart_policy: crate::durable_workload::RestartPolicy::default(),
state_uri: None,
consumer_workload_id: request.workload_id.clone().filter(|s| !s.is_empty()),
};
workloads.lock().await.insert(id, workload.clone());
state_machine.lock().await.track(DurableWorkload {
workload_id: id,
provider_npub: nostr.get_service_public_key(),
state: WorkloadState::Provisioning { since: now },
replication: workload.replication.clone(),
restart_policy: workload.restart_policy,
state_uri: workload.state_uri.clone(),
created_at: now,
expires_at: workload.expires_at,
});
{
let mut s = stats.lock().await;
s.total_jobs_completed += 1;
}
let host = &config.public_ip;
let template_access_ports: Vec<crate::nostr::TemplateAccessPort> = container_config
.template_ports
.iter()
.map(|p| {
let label = template
.as_ref()
.and_then(|t| {
t.ports
.iter()
.find(|tp| tp.container_port == p.container_port)
})
.map(|tp| tp.label.to_string())
.unwrap_or_else(|| format!("port-{}", p.container_port));
crate::nostr::TemplateAccessPort {
host_port: p.host_port,
container_port: p.container_port,
protocol: p.protocol.to_string(),
label,
}
})
.collect();
let expires_dt =
chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
let mut instructions = vec![
format!("🚀 Workload provisioned successfully!"),
format!("👤 Username: root"),
format!("🔑 Password: {}", password),
format!("⌛ Expires: {}", expires_dt.format("%Y-%m-%d %H:%M:%S UTC")),
format!("Access: You can connect to the container using SSH."),
format!(" ssh -p {} root@{}", host_port, host),
];
if !template_access_ports.is_empty() {
instructions.push(format!("Workload ports:"));
for p in &template_access_ports {
instructions.push(format!(
" {} ({}): {}://{}:{}",
p.label, p.protocol, p.protocol, host, p.host_port
));
}
}
let details = AccessDetailsContent {
pod_npub: format!("container-{}", id),
node_port: host_port,
expires_at: expires_dt.to_rfc3339(),
cpu_millicores: spec.cpu_millicores,
memory_mb: spec.memory_mb,
pod_spec_name: spec.name.clone(),
pod_spec_description: spec.description.clone(),
instructions,
host_address: host.clone(),
template_ports: template_access_ports,
};
debug!("Sending access details to {}", requester_pubkey);
nostr
.send_access_details_private_message(requester_pubkey, details, message_type)
.await?;
debug!("Access details sent successfully");
info!("Workload {} provisioned for {} seconds", id, duration_secs);
Ok(())
}
async fn handle_topup_request(
config: &ProviderConfig,
nostr: &NostrRelaySubscriber,
redeemer: &dyn MintRedeemer,
workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
requester_pubkey: &str,
message_type: &str,
request: EncryptedTopUpPodRequest,
) -> Result<()> {
info!(
"Processing topup request from {} for {}",
requester_pubkey, request.pod_npub
);
let vmid = match parse_pod_npub(&request.pod_npub) {
Some(v) => v,
None => {
let err_msg = format!(
"Could not parse pod identifier `{}`; expected `container-<id>` or numeric id",
request.pod_npub
);
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"invalid_pod_id",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let (spec_id, current_expires_at) = {
let lock = workloads.lock().await;
match lock.get(&vmid) {
Some(w) if w.owner_npub == requester_pubkey => (w.spec_id.clone(), w.expires_at),
Some(_) => {
drop(lock);
let err_msg = "Pod not owned by requester";
warn!("{}: vmid={}", err_msg, vmid);
nostr
.send_error_response(requester_pubkey, "not_owner", err_msg, None, message_type)
.await?;
return Ok(());
}
None => {
drop(lock);
let err_msg = format!("Pod {} not found", request.pod_npub);
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"not_found",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
}
};
if current_expires_at <= now {
let err_msg = format!(
"Pod {} lease has already expired; spawn a new pod instead",
request.pod_npub
);
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"lease_expired",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
let spec = match config.specs.iter().find(|s| s.id == spec_id) {
Some(s) => s.clone(),
None => {
let err_msg = format!(
"Pod {} references unknown spec `{}`; provider misconfiguration",
request.pod_npub, spec_id
);
error!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"spec_unavailable",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
};
let payment_msats = match validate_and_redeem(
redeemer,
&config.whitelisted_mints,
&request.cashu_token,
)
.await
{
Ok(v) => v,
Err(e) => {
let (error_type, err_msg) = redeem_error_to_response(&e);
error!("Topup redemption failed: {}", err_msg);
nostr
.send_error_response(requester_pubkey, error_type, &err_msg, None, message_type)
.await?;
return Ok(());
}
};
let extension_secs = payment_msats / spec.rate_msats_per_sec;
if extension_secs == 0 {
let err_msg = format!(
"Insufficient topup: {} msats buys 0 seconds at {} msats/sec",
payment_msats, spec.rate_msats_per_sec
);
warn!("{}", err_msg);
nostr
.send_error_response(
requester_pubkey,
"insufficient_payment",
&err_msg,
None,
message_type,
)
.await?;
return Ok(());
}
let new_expires_at = {
let mut lock = workloads.lock().await;
match lock.get_mut(&vmid) {
Some(w) if w.owner_npub == requester_pubkey => {
w.expires_at = w.expires_at.saturating_add(extension_secs);
w.expires_at
}
_ => {
drop(lock);
let err_msg =
"Pod was cleaned up before topup could be applied; token has been spent";
error!("{}: vmid={}", err_msg, vmid);
nostr
.send_error_response(requester_pubkey, "race_lost", err_msg, None, message_type)
.await?;
return Ok(());
}
}
};
let new_expires_dt =
chrono::DateTime::from_timestamp(new_expires_at as i64, 0).unwrap_or_default();
let response = TopUpResponseContent {
success: true,
pod_npub: request.pod_npub.clone(),
extended_duration_seconds: extension_secs,
new_expires_at: new_expires_dt.to_rfc3339(),
message: format!(
"Lease extended by {}s ({} msats @ {} msats/sec)",
extension_secs, payment_msats, spec.rate_msats_per_sec
),
};
nostr
.send_topup_response_private_message(requester_pubkey, response, message_type)
.await?;
info!(
"Topup applied to {}: +{}s (now expires at {})",
request.pod_npub, extension_secs, new_expires_at
);
Ok(())
}
fn generate_password() -> String {
use rand::Rng;
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let mut rng = rand::thread_rng();
(0..16)
.map(|_| {
let idx = rng.gen_range(0..CHARSET.len());
CHARSET[idx] as char
})
.collect()
}
pub fn parse_pod_npub(pod_npub: &str) -> Option<u32> {
if let Some(rest) = pod_npub.strip_prefix("container-") {
rest.parse().ok()
} else {
pod_npub.parse().ok()
}
}
fn redeem_error_to_response(err: &RedeemError) -> (&'static str, String) {
match err {
RedeemError::InvalidToken(msg) => {
("invalid_token", format!("Invalid Cashu token: {}", msg))
}
RedeemError::NonWhitelistedMint { mint_url } => (
"non_whitelisted_mint",
format!("Mint {} is not accepted by this provider", mint_url),
),
RedeemError::AlreadySpent => (
"token_already_spent",
"This Cashu token has already been spent at the mint".to_string(),
),
RedeemError::Pending => (
"token_pending",
"Token is pending at the mint; retry shortly".to_string(),
),
RedeemError::Network(msg) => (
"mint_network_error",
format!("Could not reach mint: {}", msg),
),
RedeemError::UnsupportedUnit(unit) => (
"unsupported_unit",
format!("Token unit {} is not supported", unit),
),
RedeemError::MintError(msg) => ("mint_error", format!("Mint rejected redemption: {}", msg)),
}
}
async fn handle_status_request(
config: &ProviderConfig,
nostr: &NostrRelaySubscriber,
workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
requester_pubkey: &str,
message_type: &str,
request: StatusRequestContent,
) -> Result<()> {
info!(
"Processing status request for pod {} from {}",
request.pod_id, requester_pubkey
);
let vmid = request.pod_id.parse::<u32>().ok();
let workload = {
let lock = workloads.lock().await;
if let Some(vmid) = vmid {
lock.get(&vmid).cloned()
} else {
lock.values()
.find(|w| w.owner_npub == request.pod_id || w.owner_npub == requester_pubkey)
.cloned()
}
};
let workload = match workload {
Some(w) => w,
None => {
let err_msg = format!(
"Workload {} not found or you don't have access",
request.pod_id
);
warn!("{}", err_msg);
nostr
.send_error_response(requester_pubkey, "not_found", &err_msg, None, message_type)
.await?;
return Ok(());
}
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let time_remaining = workload.expires_at.saturating_sub(now);
let status = if time_remaining == 0 {
"Expired"
} else {
"Running"
};
let expires_dt =
chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
let spec = config.specs.iter().find(|s| s.id == workload.spec_id);
let cpu = spec.map(|s| s.cpu_millicores).unwrap_or(1000);
let mem = spec.map(|s| s.memory_mb).unwrap_or(1024);
let host_port = match config.ssh_port_start {
Some(start) => start + (workload.vmid - config.vmid_range_start) as u16,
None => (30000 + (workload.vmid % 10000)) as u16,
};
let response = StatusResponseContent {
pod_id: workload.vmid.to_string(),
status: status.to_string(),
expires_at: expires_dt.to_rfc3339(),
time_remaining_seconds: time_remaining,
cpu_millicores: cpu,
memory_mb: mem,
ssh_host: config.public_ip.clone(),
ssh_port: host_port,
ssh_username: "root".to_string(),
};
nostr
.send_status_response(requester_pubkey, response, message_type)
.await?;
info!("Status response sent for workload {}", workload.vmid);
Ok(())
}
pub fn load_config(path: &str) -> Result<ProviderConfig> {
let content =
std::fs::read_to_string(path).context(format!("Failed to read config file: {}", path))?;
serde_json::from_str(&content).context("Failed to parse provider config")
}
pub fn save_config(path: &str, config: &ProviderConfig) -> Result<()> {
let content = serde_json::to_string_pretty(config)?;
std::fs::write(path, content).context(format!("Failed to write config file: {}", path))?;
Ok(())
}
const STANDBY_PROMOTION_DELAY_SECS: u64 = 30;
fn compute_warm_standby_role(
self_npub: &str,
request: &EncryptedSpawnPodRequest,
) -> WarmStandbyRole {
use crate::durable_workload::ReplicationMode;
match request.replication.as_ref() {
Some(ReplicationMode::WarmStandby { standby_providers }) => {
let primary = request.primary_npub.as_deref().unwrap_or("");
warm_standby_role(self_npub, primary, standby_providers)
}
_ => WarmStandbyRole::Primary,
}
}
fn schedule_standby_promotion(
backend: Arc<dyn ComputeBackend>,
workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
state_machine: Arc<Mutex<WorkloadStateMachine>>,
standby_slots: Arc<Mutex<HashMap<String, StandbySlot>>>,
nostr: NostrRelaySubscriber,
slot: StandbySlot,
) {
let delay_secs = (slot.standby_index as u64).saturating_mul(STANDBY_PROMOTION_DELAY_SECS);
let workload_id = slot.workload_id.clone();
let standby_index = slot.standby_index;
info!(
"Scheduling standby promotion for workload {} after {}s backoff (standby index {})",
workload_id, delay_secs, standby_index
);
tokio::spawn(async move {
if delay_secs > 0 {
tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await;
}
let still_present = {
let mut slots = standby_slots.lock().await;
slots.remove(&workload_id)
};
let slot = match still_present {
Some(s) => s,
None => {
debug!(
"Standby slot for workload {} already drained; skipping promotion",
workload_id
);
return;
}
};
if !slot.peer_standby_npubs.is_empty() {
match nostr
.query_standby_promotion_announcements(&slot.workload_id, &slot.peer_standby_npubs)
.await
{
Ok(Some(announcement)) => {
info!(
"Peer standby {} already promoted workload {} at {}; dropping slot without spawning",
announcement.new_primary_npub,
announcement.workload_id,
announcement.promoted_at
);
return;
}
Ok(None) => {
}
Err(e) => {
warn!(
"Failed to query peer promotion announcements for workload {}: {}; proceeding with promotion (best-effort)",
slot.workload_id, e
);
}
}
}
info!(
"Promoting standby slot {} → primary (vmid {})",
slot.workload_id, slot.container_config.id
);
if let Err(e) = backend.create_container(&slot.container_config).await {
error!(
"Standby promotion failed for workload {}: backend error: {}",
slot.workload_id, e
);
standby_slots
.lock()
.await
.insert(slot.workload_id.clone(), slot);
return;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let workload = WorkloadInfo {
vmid: slot.container_config.id,
workload_type: "lxc".to_string(),
spec_id: slot.spec_id.clone(),
created_at: now,
expires_at: slot.expires_at,
owner_npub: slot.owner_npub.clone(),
consumer_workload_id: Some(slot.workload_id.clone()),
replication: crate::durable_workload::ReplicationMode::None,
restart_policy: crate::durable_workload::RestartPolicy::default(),
state_uri: None,
};
let active_workloads_count = {
let mut w = workloads.lock().await;
w.insert(slot.container_config.id, workload.clone());
w.len() as u32
};
state_machine
.lock()
.await
.track(crate::durable_workload::DurableWorkload {
workload_id: slot.container_config.id,
provider_npub: String::new(), state: crate::durable_workload::WorkloadState::Provisioning { since: now },
replication: workload.replication.clone(),
restart_policy: workload.restart_policy,
state_uri: workload.state_uri.clone(),
created_at: now,
expires_at: workload.expires_at,
});
info!(
"Standby promotion complete: workload {} now running locally (vmid {})",
slot.workload_id, slot.container_config.id
);
let _ = active_workloads_count;
let announcement = StandbyPromotionAnnouncementContent {
workload_id: slot.workload_id.clone(),
new_primary_npub: nostr.get_service_public_key(),
promoted_at: now,
version: crate::nostr::SCHEMA_VERSION,
};
if let Err(e) = nostr
.publish_standby_promotion_announcement(announcement)
.await
{
warn!(
"Post-promotion announcement publish failed for workload {}: {}; peer standbys will not back off and may produce a duplicate primary",
slot.workload_id, e
);
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable_workload::ReplicationMode;
fn req_with(
replication: Option<ReplicationMode>,
primary_npub: Option<&str>,
) -> EncryptedSpawnPodRequest {
EncryptedSpawnPodRequest {
cashu_token: "tok".to_string(),
pod_spec_id: Some("basic".to_string()),
pod_image: "ubuntu:22.04".to_string(),
ssh_username: "u".to_string(),
ssh_password: "p".to_string(),
template_slug: None,
replication,
primary_npub: primary_npub.map(|s| s.to_string()),
workload_id: Some("wid-test".to_string()),
volume_encryption: None,
}
}
#[test]
fn role_is_primary_for_non_warm_standby() {
let r = compute_warm_standby_role("npub1self", &req_with(None, None));
assert_eq!(r, WarmStandbyRole::Primary);
let r = compute_warm_standby_role(
"npub1self",
&req_with(Some(ReplicationMode::Checkpointed), None),
);
assert_eq!(r, WarmStandbyRole::Primary);
}
#[test]
fn role_is_primary_when_self_is_designated_primary() {
let r = compute_warm_standby_role(
"npub1primary",
&req_with(
Some(ReplicationMode::WarmStandby {
standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
}),
Some("npub1primary"),
),
);
assert_eq!(r, WarmStandbyRole::Primary);
}
#[test]
fn role_is_standby_with_correct_index_when_self_in_list() {
let r = compute_warm_standby_role(
"npub1c",
&req_with(
Some(ReplicationMode::WarmStandby {
standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
}),
Some("npub1primary"),
),
);
assert_eq!(r, WarmStandbyRole::Standby { index: 1, count: 2 });
}
#[test]
fn role_is_not_addressed_when_self_unknown_to_topology() {
let r = compute_warm_standby_role(
"npub1stranger",
&req_with(
Some(ReplicationMode::WarmStandby {
standby_providers: vec!["npub1b".to_string(), "npub1c".to_string()],
}),
Some("npub1primary"),
),
);
assert_eq!(r, WarmStandbyRole::NotAddressed);
}
#[test]
fn fresh_primary_heartbeat_is_not_silent() {
assert!(!primary_is_silent(1_000_000, 999_940, 180));
}
#[test]
fn primary_just_past_threshold_is_silent() {
assert!(primary_is_silent(1_000_000, 999_820, 180));
assert!(!primary_is_silent(1_000_000, 999_821, 180));
}
#[test]
fn unset_baseline_is_not_silent() {
assert!(!primary_is_silent(1_000_000, 0, 180));
assert!(!primary_is_silent(50, 0, 180));
}
#[test]
fn fresh_slot_within_grace_window_is_not_silent() {
let created_at = 1_000_000;
let now = created_at + 30;
assert!(!primary_is_silent(now, created_at, 180));
}
#[test]
fn fresh_slot_past_grace_window_is_silent() {
let created_at = 1_000_000;
let now = created_at + 180;
assert!(primary_is_silent(now, created_at, 180));
}
#[test]
fn clock_skew_underflow_does_not_panic_or_misfire() {
assert!(!primary_is_silent(100, 200, 180));
}
fn make_slot(workload_id: &str, expires_at: u64) -> StandbySlot {
StandbySlot {
workload_id: workload_id.to_string(),
primary_npub: "npub1primary".to_string(),
standby_index: 0,
standby_count: 1,
container_config: ContainerConfig {
id: 1,
name: "test".to_string(),
image: "img".to_string(),
cpu_cores: 1,
memory_mb: 1024,
storage_gb: 10,
password: "p".to_string(),
ssh_key: None,
host_port: None,
template_ports: vec![],
template_env: HashMap::new(),
extra_runtime_args: vec![],
data_path: None,
volume_encryption_key: None,
},
spec_id: "basic".to_string(),
expires_at,
owner_npub: "npub1owner".to_string(),
created_at: 0,
peer_standby_npubs: vec![],
}
}
fn select_expired(slots: &HashMap<String, StandbySlot>, now: u64) -> Vec<String> {
slots
.iter()
.filter(|(_, slot)| slot.expires_at <= now)
.map(|(workload_id, _)| workload_id.clone())
.collect()
}
#[test]
fn select_expired_returns_only_past_expiry_slots() {
let mut slots = HashMap::new();
slots.insert("active".to_string(), make_slot("active", 2_000));
slots.insert("expired".to_string(), make_slot("expired", 999));
let mut expired = select_expired(&slots, 1_000);
expired.sort();
assert_eq!(expired, vec!["expired".to_string()]);
}
#[test]
fn select_expired_treats_expires_at_equals_now_as_expired() {
let mut slots = HashMap::new();
slots.insert("boundary".to_string(), make_slot("boundary", 1_000));
let expired = select_expired(&slots, 1_000);
assert_eq!(expired, vec!["boundary".to_string()]);
}
#[test]
fn select_expired_returns_empty_when_no_slots_expired() {
let mut slots = HashMap::new();
slots.insert("a".to_string(), make_slot("a", 9_999));
slots.insert("b".to_string(), make_slot("b", 9_999));
assert!(select_expired(&slots, 1_000).is_empty());
}
}