Skip to main content

paygress/
provider.rs

1// Provider Service
2//
3// Runs on machine operator's server to:
4// - Publish provider offer to Nostr
5// - Send periodic heartbeats
6// - Listen for and handle spawn requests
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15use crate::nostr::{
16    NostrRelaySubscriber, RelayConfig, ProviderOfferContent, HeartbeatContent, 
17    CapacityInfo, PodSpec, EncryptedSpawnPodRequest, AccessDetailsContent, 
18    ErrorResponseContent, parse_private_message_content, PrivateRequest,
19    StatusRequestContent, StatusResponseContent,
20};
21use crate::proxmox::{ProxmoxClient, ProxmoxBackend};
22use crate::compute::{ComputeBackend, ContainerConfig};
23use crate::lxd::LxdBackend;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum BackendType {
27    Proxmox,
28    LXD,
29}
30
31impl Default for BackendType {
32    fn default() -> Self {
33        Self::Proxmox
34    }
35}
36
37
38/// Provider configuration
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProviderConfig {
41    #[serde(default)]
42    pub backend_type: BackendType,
43    
44    // Proxmox / Backend settings
45
46    pub proxmox_url: String,
47    pub proxmox_token_id: String,
48    pub proxmox_token_secret: String,
49    pub proxmox_node: String,
50    pub proxmox_storage: String,
51    pub proxmox_template: String,
52    pub proxmox_bridge: String,
53    pub vmid_range_start: u32,
54    pub vmid_range_end: u32,
55    
56    // Nostr settings
57    pub nostr_private_key: String,
58    pub nostr_relays: Vec<String>,
59    
60    // Provider metadata
61    pub provider_name: String,
62    pub provider_location: Option<String>,
63    pub public_ip: String,
64    pub capabilities: Vec<String>,
65    
66    // Pricing & specs
67    pub specs: Vec<PodSpec>,
68    pub whitelisted_mints: Vec<String>,
69    
70    // Operational settings
71    pub heartbeat_interval_secs: u64,
72    pub minimum_duration_seconds: u64,
73
74    // Tunnel settings (for providers behind NAT)
75    #[serde(default)]
76    pub tunnel_enabled: bool,
77    #[serde(default)]
78    pub tunnel_interface: Option<String>,
79    #[serde(default)]
80    pub ssh_port_start: Option<u16>,
81    #[serde(default)]
82    pub ssh_port_end: Option<u16>,
83}
84
85impl Default for ProviderConfig {
86    fn default() -> Self {
87        Self {
88            backend_type: BackendType::Proxmox,
89            proxmox_url: "https://localhost:8006/api2/json".to_string(),
90            proxmox_token_id: "root@pam!paygress".to_string(),
91            proxmox_token_secret: String::new(),
92            proxmox_node: "pve".to_string(),
93            proxmox_storage: "local-lvm".to_string(),
94            proxmox_template: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
95            proxmox_bridge: "vmbr0".to_string(),
96            vmid_range_start: 1000,
97            vmid_range_end: 1999,
98            nostr_private_key: String::new(),
99            nostr_relays: vec![
100                "wss://relay.damus.io".to_string(),
101                "wss://nos.lol".to_string(),
102            ],
103            provider_name: "Paygress Provider".to_string(),
104            provider_location: None,
105            public_ip: "127.0.0.1".to_string(),
106            capabilities: vec!["lxc".to_string()],
107            specs: vec![
108                PodSpec {
109                    id: "basic".to_string(),
110                    name: "Basic".to_string(),
111                    description: "1 vCPU, 1GB RAM".to_string(),
112                    cpu_millicores: 1000,
113                    memory_mb: 1024,
114                    rate_msats_per_sec: 50,
115                },
116            ],
117            whitelisted_mints: vec!["https://mint.minibits.cash".to_string()],
118            heartbeat_interval_secs: 60,
119            minimum_duration_seconds: 60,
120            tunnel_enabled: false,
121            tunnel_interface: None,
122            ssh_port_start: None,
123            ssh_port_end: None,
124        }
125    }
126}
127
128/// Active workload tracking
129#[derive(Debug, Clone, Serialize)]
130pub struct WorkloadInfo {
131    pub vmid: u32,
132    pub workload_type: String,  // "lxc" or "vm"
133    pub spec_id: String,
134    pub created_at: u64,
135    pub expires_at: u64,
136    pub owner_npub: String,
137}
138
139/// Provider service that manages the node
140pub struct ProviderService {
141    config: ProviderConfig,
142    backend: Arc<dyn ComputeBackend>,
143    nostr: NostrRelaySubscriber,
144    active_workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
145    stats: Arc<Mutex<ProviderStats>>,
146}
147
148#[derive(Debug, Clone, Default)]
149struct ProviderStats {
150    total_jobs_completed: u64,
151    uptime_start: u64,
152}
153
154impl ProviderService {
155    /// Create a new provider service
156    pub async fn new(config: ProviderConfig) -> Result<Self> {
157        let backend: Arc<dyn ComputeBackend> = match config.backend_type {
158            BackendType::Proxmox => {
159                let client = ProxmoxClient::new(
160                    &config.proxmox_url,
161                    &config.proxmox_token_id,
162                    &config.proxmox_token_secret,
163                    &config.proxmox_node,
164                )?;
165                Arc::new(ProxmoxBackend::new(
166                    client,
167                    &config.proxmox_storage,
168                    &config.proxmox_bridge,
169                    &config.proxmox_template,
170                ))
171            }
172            BackendType::LXD => {
173                Arc::new(LxdBackend::new(
174                    &config.proxmox_storage, // Reuse storage field for pool name
175                    &config.proxmox_bridge,  // Reuse bridge for network
176                ))
177            }
178        };
179
180        // Initialize Nostr client
181        let relay_config = RelayConfig {
182            relays: config.nostr_relays.clone(),
183            private_key: Some(config.nostr_private_key.clone()),
184        };
185        let nostr = NostrRelaySubscriber::new(relay_config).await?;
186
187        let now = std::time::SystemTime::now()
188            .duration_since(std::time::UNIX_EPOCH)?
189            .as_secs();
190
191        Ok(Self {
192            config,
193            backend,
194            nostr,
195            active_workloads: Arc::new(Mutex::new(HashMap::new())),
196            stats: Arc::new(Mutex::new(ProviderStats {
197                total_jobs_completed: 0,
198                uptime_start: now,
199            })),
200        })
201    }
202
203    /// Get the provider's public key (npub)
204    pub fn get_npub(&self) -> String {
205        self.nostr.get_service_public_key()
206    }
207
208    /// Start the provider service (runs forever)
209    pub async fn run(&self) -> Result<()> {
210        info!("🚀 Starting Paygress Provider Service");
211        info!("Provider: {}", self.config.provider_name);
212        info!("NPUB: {}", self.get_npub());
213
214        // Publish initial offer
215        self.publish_offer().await?;
216
217        // Run heartbeat loop and request listener concurrently
218        tokio::select! {
219            result = self.heartbeat_loop() => {
220                error!("Heartbeat loop exited: {:?}", result);
221                result
222            }
223            result = self.listen_for_requests() => {
224                error!("Request listener exited: {:?}", result);
225                result
226            }
227            result = self.cleanup_loop() => {
228                error!("Cleanup loop exited: {:?}", result);
229                result
230            }
231        }
232    }
233
234    /// Publish provider offer to Nostr
235    async fn publish_offer(&self) -> Result<()> {
236        let stats = self.stats.lock().await;
237        
238        let offer = ProviderOfferContent {
239            provider_npub: self.get_npub(),
240            hostname: self.config.provider_name.clone(),
241            location: self.config.provider_location.clone(),
242            capabilities: self.config.capabilities.clone(),
243            specs: self.config.specs.clone(),
244            whitelisted_mints: self.config.whitelisted_mints.clone(),
245            uptime_percent: 100.0, // Will be calculated from heartbeat history
246            total_jobs_completed: stats.total_jobs_completed,
247            api_endpoint: None, // TODO: Add if supporting direct API
248        };
249
250        self.nostr.publish_provider_offer(offer).await?;
251        Ok(())
252    }
253
254    /// Send heartbeat every N seconds
255    async fn heartbeat_loop(&self) -> Result<()> {
256        let interval = tokio::time::Duration::from_secs(self.config.heartbeat_interval_secs);
257        
258        loop {
259            if let Err(e) = self.send_heartbeat().await {
260                warn!("Failed to send heartbeat: {}", e);
261            }
262            tokio::time::sleep(interval).await;
263        }
264    }
265
266    /// Send a single heartbeat
267    async fn send_heartbeat(&self) -> Result<()> {
268        let workloads = self.active_workloads.lock().await;
269        
270        // Get node status for capacity info
271        let capacity = match self.backend.get_node_status().await {
272            Ok(status) => CapacityInfo {
273                cpu_available: ((1.0 - status.cpu_usage) * 100000.0) as u64, // Convert to millicores
274                memory_mb_available: status.memory_total.saturating_sub(status.memory_used) / (1024 * 1024),
275                storage_gb_available: status.disk_total.saturating_sub(status.disk_used) / (1024 * 1024 * 1024), 
276            },
277            Err(e) => {
278                warn!("Failed to get node status: {}", e);
279                CapacityInfo {
280                    cpu_available: 0,
281                    memory_mb_available: 0,
282                    storage_gb_available: 0,
283                }
284            }
285        };
286
287        let now = std::time::SystemTime::now()
288            .duration_since(std::time::UNIX_EPOCH)?
289            .as_secs();
290
291        let heartbeat = HeartbeatContent {
292            provider_npub: self.get_npub(),
293            timestamp: now,
294            active_workloads: workloads.len() as u32,
295            available_capacity: capacity,
296        };
297
298        self.nostr.publish_heartbeat(heartbeat).await?;
299        Ok(())
300    }
301
302    /// Listen for spawn requests via NIP-17
303    async fn listen_for_requests(&self) -> Result<()> {
304        info!("Listening for Paygress requests...");
305        
306        // Clone what we need for the handler
307        let backend = self.backend.clone();
308        let config = self.config.clone();
309        let nostr = self.nostr.clone();
310        let workloads = self.active_workloads.clone();
311        let stats = self.stats.clone();
312
313        self.nostr.subscribe_to_pod_events(move |event| {
314            let backend = backend.clone();
315            let config = config.clone();
316            let nostr = nostr.clone();
317            let workloads = workloads.clone();
318            let stats = stats.clone();
319            
320            Box::pin(async move {
321                let my_pubkey = nostr.public_key().to_hex();
322                if event.pubkey == my_pubkey {
323                    return Ok(());
324                }
325
326                debug!("Handler received event kind: {}, from: {}, message_type: {}", event.kind, event.pubkey, event.message_type);
327                
328                // Parse the request
329                let request_type = match parse_private_message_content(&event.content) {
330                    Ok(req) => req,
331                    Err(e) => {
332                        warn!("Failed to parse request from {}: {}", event.pubkey, e);
333                        let error = ErrorResponseContent {
334                            error_type: "invalid_request".to_string(),
335                            message: "Failed to parse request".to_string(),
336                            details: Some(e.to_string()),
337                        };
338                        let _ = nostr.send_error_response_private_message(
339                            &event.pubkey,
340                            error,
341                            &event.message_type,
342                        ).await;
343                        return Ok(());
344                    }
345                };
346
347                debug!("Successfully parsed request metadata");
348
349                // Dispatch to specific handler
350                match request_type {
351                    PrivateRequest::Spawn(spawn_req) => {
352                        if let Err(e) = handle_spawn_request(
353                            backend.as_ref(),
354                            &config,
355                            &nostr,
356                            &workloads,
357                            &stats,
358                            &event.pubkey,
359                            &event.message_type,
360                            spawn_req,
361                        ).await {
362                            error!("Failed to handle spawn request: {}", e);
363                        }
364                    }
365                    PrivateRequest::Status(status_req) => {
366                        if let Err(e) = handle_status_request(
367                            backend.as_ref(),
368                            &config,
369                            &nostr,
370                            &workloads,
371                            &event.pubkey,
372                            &event.message_type,
373                            status_req,
374                        ).await {
375                            error!("Failed to handle status request: {}", e);
376                        }
377                    }
378                    PrivateRequest::TopUp(_) => {
379                        warn!("TopUp request received but not yet fully implemented");
380                        let _ = nostr.send_error_response(
381                            &event.pubkey,
382                            "not_implemented",
383                            "TopUp is not yet implemented on this provider",
384                            None,
385                            &event.message_type,
386                        ).await;
387                    }
388                }
389
390                Ok(())
391            })
392        }).await?;
393
394        Ok(())
395    }
396
397    /// Cleanup expired workloads
398    async fn cleanup_loop(&self) -> Result<()> {
399        let interval = tokio::time::Duration::from_secs(30);
400        
401        loop {
402            tokio::time::sleep(interval).await;
403            
404            let now = std::time::SystemTime::now()
405                .duration_since(std::time::UNIX_EPOCH)?
406                .as_secs();
407
408            let mut workloads = self.active_workloads.lock().await;
409            let expired: Vec<u32> = workloads
410                .iter()
411                .filter(|(_, w)| w.expires_at <= now)
412                .map(|(vmid, _)| *vmid)
413                .collect();
414
415            for vmid in expired {
416                info!("Cleaning up expired workload: {}", vmid);
417                
418                if let Some(_workload) = workloads.remove(&vmid) {
419                    let stop_result = self.backend.stop_container(vmid).await;
420                    let result = match stop_result {
421                        Ok(_) => self.backend.delete_container(vmid).await,
422                        Err(e) => Err(e),
423                    };
424
425                    match result {
426                        Ok(_) => {
427                            info!("Cleaned up workload {}", vmid);
428                            let mut stats = self.stats.lock().await;
429                            stats.total_jobs_completed += 1;
430                        }
431                        Err(e) => error!("Failed to cleanup workload {}: {}", vmid, e),
432                    }
433                }
434            }
435        }
436    }
437}
438
439// Clone impl removed as ComputeBackend is Arc'd
440
441/// Handle a spawn request
442async fn handle_spawn_request(
443    backend: &dyn ComputeBackend,
444    config: &ProviderConfig,
445    nostr: &NostrRelaySubscriber,
446    workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
447    stats: &Arc<Mutex<ProviderStats>>,
448    requester_pubkey: &str,
449    message_type: &str,
450    request: EncryptedSpawnPodRequest,
451) -> Result<()> {
452    info!("Processing spawn request from {} (tier: {:?})", requester_pubkey, request.pod_spec_id);
453
454    // 1. Extract Cashu token value
455    let payment_msats = match crate::cashu::extract_token_value(&request.cashu_token).await {
456        Ok(v) => v,
457        Err(e) => {
458            let err_msg = format!("Invalid Cashu token: {}", e);
459            error!("{}", err_msg);
460            nostr.send_error_response(
461                requester_pubkey,
462                "invalid_token",
463                &err_msg,
464                None,
465                message_type,
466            ).await?;
467            return Ok(());
468        }
469    };
470
471    // 2. Find matching spec/tier
472    let spec = match config.specs.iter().find(|s| Some(s.id.clone()) == request.pod_spec_id) {
473        Some(s) => s,
474        None => {
475            // Default to first spec if none specified or not found
476            if let Some(s) = config.specs.first() {
477                s
478            } else {
479                let err_msg = "No pod specifications available on this provider";
480                error!("{}", err_msg);
481                nostr.send_error_response(
482                    requester_pubkey,
483                    "no_specs",
484                    err_msg,
485                    None,
486                    message_type,
487                ).await?;
488                return Ok(());
489            }
490        }
491    };
492
493    // 3. Calculate Duration
494    let duration_secs = payment_msats / spec.rate_msats_per_sec;
495    if duration_secs < config.minimum_duration_seconds {
496        let err_msg = format!(
497            "Insufficient payment for minimum duration. Required: {} msats for {}s",
498            config.minimum_duration_seconds * spec.rate_msats_per_sec,
499            config.minimum_duration_seconds
500        );
501        warn!("{}", err_msg);
502        nostr.send_error_response(
503            requester_pubkey,
504            "insufficient_payment",
505            &err_msg,
506            None,
507            message_type,
508        ).await?;
509        return Ok(());
510    }
511
512    info!("Validated payment: {} msats for {}s on tier {}", payment_msats, duration_secs, spec.name);
513
514    // 4. Find available ID
515    let id = match backend.find_available_id(
516        config.vmid_range_start,
517        config.vmid_range_end,
518    ).await {
519        Ok(id) => id,
520        Err(e) => {
521            let err_msg = format!("Failed to find available ID: {}", e);
522            error!("{}", err_msg);
523            nostr.send_error_response(
524                requester_pubkey,
525                "provisioning_error",
526                &err_msg,
527                None,
528                message_type,
529            ).await?;
530            return Ok(());
531        }
532    };
533
534    // 5. Generate credentials
535    let password = crate::sidecar_service::SidecarState::generate_password();
536    
537    // Calculate host port for forwarding
538    let host_port = match config.ssh_port_start {
539        Some(start) => start + (id - config.vmid_range_start) as u16,
540        None => 30000 + (id % 10000) as u16,
541    };
542
543    // 6. Create Container
544    let container_config = ContainerConfig {
545        id,
546        name: format!("paygress-{}", id),
547        image: request.pod_image.clone(),
548        cpu_cores: (spec.cpu_millicores / 1000).max(1) as u32,
549        memory_mb: spec.memory_mb as u32,
550        storage_gb: 10, // Default 10GB
551        password: password.clone(),
552        ssh_key: None,
553        host_port: Some(host_port),
554    };
555
556    debug!("Calling backend.create_container for workload {}", id);
557    if let Err(e) = backend.create_container(&container_config).await {
558        let err_msg = format!("Backend failed to create workload: {}", e);
559        error!("{}", err_msg);
560        nostr.send_error_response(
561            requester_pubkey,
562            "backend_error",
563            &err_msg,
564            None,
565            message_type,
566        ).await?;
567        return Ok(());
568    }
569    debug!("Successfully created container {}", id);
570
571    let now = std::time::SystemTime::now()
572        .duration_since(std::time::UNIX_EPOCH)?
573        .as_secs();
574
575    // 7. Track Workload
576    let workload = WorkloadInfo {
577        vmid: id,
578        workload_type: "lxc".to_string(), // Default for Proxmox/LXD
579        spec_id: spec.id.clone(),
580        created_at: now,
581        expires_at: now + duration_secs,
582        owner_npub: requester_pubkey.to_string(),
583    };
584
585    workloads.lock().await.insert(id, workload.clone());
586    
587    // Update stats
588    {
589        let mut s = stats.lock().await;
590        s.total_jobs_completed += 1;
591    }
592
593    // 8. Get Access Details
594    // Use configured public IP/host
595    let host = &config.public_ip;
596    
597    // Send access details
598    let expires_dt = chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
599    let details = AccessDetailsContent {
600        pod_npub: format!("container-{}", id),
601        node_port: host_port,
602        expires_at: expires_dt.to_rfc3339(),
603        cpu_millicores: spec.cpu_millicores,
604        memory_mb: spec.memory_mb,
605        pod_spec_name: spec.name.clone(),
606        pod_spec_description: spec.description.clone(),
607        instructions: vec![
608            format!("🚀 Workload provisioned successfully!"),
609            format!("👤 Username: root"),
610            format!("🔑 Password: {}", password),
611            format!("⌛ Expires: {}", expires_dt.format("%Y-%m-%d %H:%M:%S UTC")),
612            format!("Access: You can connect to the container using SSH."),
613            format!("  ssh -p {} root@{}", host_port, host),
614        ],
615    };
616
617    debug!("Sending access details to {}", requester_pubkey);
618    nostr.send_access_details_private_message(
619        requester_pubkey,
620        details,
621        message_type,
622    ).await?;
623
624    debug!("Access details sent successfully");
625
626    info!("Workload {} provisioned for {} seconds", id, duration_secs);
627    Ok(())
628}
629
630/// Handle a status request
631async fn handle_status_request(
632    backend: &dyn ComputeBackend,
633    config: &ProviderConfig,
634    nostr: &NostrRelaySubscriber,
635    workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
636    requester_pubkey: &str,
637    message_type: &str,
638    request: StatusRequestContent,
639) -> Result<()> {
640    info!("Processing status request for pod {} from {}", request.pod_id, requester_pubkey);
641
642    // 1. Try to find the workload by ID (which could be vmid)
643    let vmid = request.pod_id.parse::<u32>().ok();
644    
645    let workload = {
646        let lock = workloads.lock().await;
647        if let Some(vmid) = vmid {
648            lock.get(&vmid).cloned()
649        } else {
650            // If not a number, maybe it's a pod_npub? (not yet implemented in tracking, but we search by owner for now)
651            lock.values().find(|w| w.owner_npub == request.pod_id || w.owner_npub == requester_pubkey).cloned()
652        }
653    };
654
655    let workload = match workload {
656        Some(w) => w,
657        None => {
658            let err_msg = format!("Workload {} not found or you don't have access", request.pod_id);
659            warn!("{}", err_msg);
660            nostr.send_error_response(
661                requester_pubkey,
662                "not_found",
663                &err_msg,
664                None,
665                message_type,
666            ).await?;
667            return Ok(());
668        }
669    };
670
671    // 2. Check backend status
672    let status_info = match backend.get_node_status().await {
673        Ok(s) => s,
674        Err(_) => crate::compute::NodeStatus { cpu_usage: 0.0, memory_used: 0, memory_total: 0, disk_used: 0, disk_total: 0 },
675    };
676
677    // 3. Prepare response
678    let now = std::time::SystemTime::now()
679        .duration_since(std::time::UNIX_EPOCH)?
680        .as_secs();
681    
682    let time_remaining = workload.expires_at.saturating_sub(now);
683    let status = if time_remaining == 0 { "Expired" } else { "Running" };
684
685    let expires_dt = chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
686    
687    // Look up spec for actual resource values
688    let spec = config.specs.iter()
689        .find(|s| s.id == workload.spec_id);
690    let cpu = spec.map(|s| s.cpu_millicores).unwrap_or(1000);
691    let mem = spec.map(|s| s.memory_mb).unwrap_or(1024);
692    let host_port = match config.ssh_port_start {
693        Some(start) => start + (workload.vmid - config.vmid_range_start) as u16,
694        None => (30000 + (workload.vmid % 10000)) as u16,
695    };
696
697    let response = StatusResponseContent {
698        pod_id: workload.vmid.to_string(),
699        status: status.to_string(),
700        expires_at: expires_dt.to_rfc3339(),
701        time_remaining_seconds: time_remaining,
702        cpu_millicores: cpu,
703        memory_mb: mem,
704        ssh_host: config.public_ip.clone(),
705        ssh_port: host_port,
706        ssh_username: "root".to_string(),
707    };
708
709    nostr.send_status_response(
710        requester_pubkey,
711        response,
712        message_type,
713    ).await?;
714
715    info!("Status response sent for workload {}", workload.vmid);
716    Ok(())
717}
718
719/// Load provider config from file
720pub fn load_config(path: &str) -> Result<ProviderConfig> {
721    let content = std::fs::read_to_string(path)
722        .context(format!("Failed to read config file: {}", path))?;
723    
724    serde_json::from_str(&content)
725        .context("Failed to parse provider config")
726}
727
728/// Save provider config to file
729pub fn save_config(path: &str, config: &ProviderConfig) -> Result<()> {
730    let content = serde_json::to_string_pretty(config)?;
731    std::fs::write(path, content)
732        .context(format!("Failed to write config file: {}", path))?;
733    Ok(())
734}