1use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15use crate::nostr::{
16 NostrRelaySubscriber, RelayConfig, ProviderOfferContent, HeartbeatContent,
17 CapacityInfo, PodSpec, EncryptedSpawnPodRequest, AccessDetailsContent,
18 ErrorResponseContent, parse_private_message_content, PrivateRequest,
19 StatusRequestContent, StatusResponseContent,
20};
21use crate::proxmox::{ProxmoxClient, ProxmoxBackend};
22use crate::compute::{ComputeBackend, ContainerConfig};
23use crate::lxd::LxdBackend;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum BackendType {
27 Proxmox,
28 LXD,
29}
30
31impl Default for BackendType {
32 fn default() -> Self {
33 Self::Proxmox
34 }
35}
36
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProviderConfig {
41 #[serde(default)]
42 pub backend_type: BackendType,
43
44 pub proxmox_url: String,
47 pub proxmox_token_id: String,
48 pub proxmox_token_secret: String,
49 pub proxmox_node: String,
50 pub proxmox_storage: String,
51 pub proxmox_template: String,
52 pub proxmox_bridge: String,
53 pub vmid_range_start: u32,
54 pub vmid_range_end: u32,
55
56 pub nostr_private_key: String,
58 pub nostr_relays: Vec<String>,
59
60 pub provider_name: String,
62 pub provider_location: Option<String>,
63 pub public_ip: String,
64 pub capabilities: Vec<String>,
65
66 pub specs: Vec<PodSpec>,
68 pub whitelisted_mints: Vec<String>,
69
70 pub heartbeat_interval_secs: u64,
72 pub minimum_duration_seconds: u64,
73
74 #[serde(default)]
76 pub tunnel_enabled: bool,
77 #[serde(default)]
78 pub tunnel_interface: Option<String>,
79 #[serde(default)]
80 pub ssh_port_start: Option<u16>,
81 #[serde(default)]
82 pub ssh_port_end: Option<u16>,
83}
84
85impl Default for ProviderConfig {
86 fn default() -> Self {
87 Self {
88 backend_type: BackendType::Proxmox,
89 proxmox_url: "https://localhost:8006/api2/json".to_string(),
90 proxmox_token_id: "root@pam!paygress".to_string(),
91 proxmox_token_secret: String::new(),
92 proxmox_node: "pve".to_string(),
93 proxmox_storage: "local-lvm".to_string(),
94 proxmox_template: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
95 proxmox_bridge: "vmbr0".to_string(),
96 vmid_range_start: 1000,
97 vmid_range_end: 1999,
98 nostr_private_key: String::new(),
99 nostr_relays: vec![
100 "wss://relay.damus.io".to_string(),
101 "wss://nos.lol".to_string(),
102 ],
103 provider_name: "Paygress Provider".to_string(),
104 provider_location: None,
105 public_ip: "127.0.0.1".to_string(),
106 capabilities: vec!["lxc".to_string()],
107 specs: vec![
108 PodSpec {
109 id: "basic".to_string(),
110 name: "Basic".to_string(),
111 description: "1 vCPU, 1GB RAM".to_string(),
112 cpu_millicores: 1000,
113 memory_mb: 1024,
114 rate_msats_per_sec: 50,
115 },
116 ],
117 whitelisted_mints: vec!["https://mint.minibits.cash".to_string()],
118 heartbeat_interval_secs: 60,
119 minimum_duration_seconds: 60,
120 tunnel_enabled: false,
121 tunnel_interface: None,
122 ssh_port_start: None,
123 ssh_port_end: None,
124 }
125 }
126}
127
128#[derive(Debug, Clone, Serialize)]
130pub struct WorkloadInfo {
131 pub vmid: u32,
132 pub workload_type: String, pub spec_id: String,
134 pub created_at: u64,
135 pub expires_at: u64,
136 pub owner_npub: String,
137}
138
139pub struct ProviderService {
141 config: ProviderConfig,
142 backend: Arc<dyn ComputeBackend>,
143 nostr: NostrRelaySubscriber,
144 active_workloads: Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
145 stats: Arc<Mutex<ProviderStats>>,
146}
147
148#[derive(Debug, Clone, Default)]
149struct ProviderStats {
150 total_jobs_completed: u64,
151 uptime_start: u64,
152}
153
154impl ProviderService {
155 pub async fn new(config: ProviderConfig) -> Result<Self> {
157 let backend: Arc<dyn ComputeBackend> = match config.backend_type {
158 BackendType::Proxmox => {
159 let client = ProxmoxClient::new(
160 &config.proxmox_url,
161 &config.proxmox_token_id,
162 &config.proxmox_token_secret,
163 &config.proxmox_node,
164 )?;
165 Arc::new(ProxmoxBackend::new(
166 client,
167 &config.proxmox_storage,
168 &config.proxmox_bridge,
169 &config.proxmox_template,
170 ))
171 }
172 BackendType::LXD => {
173 Arc::new(LxdBackend::new(
174 &config.proxmox_storage, &config.proxmox_bridge, ))
177 }
178 };
179
180 let relay_config = RelayConfig {
182 relays: config.nostr_relays.clone(),
183 private_key: Some(config.nostr_private_key.clone()),
184 };
185 let nostr = NostrRelaySubscriber::new(relay_config).await?;
186
187 let now = std::time::SystemTime::now()
188 .duration_since(std::time::UNIX_EPOCH)?
189 .as_secs();
190
191 Ok(Self {
192 config,
193 backend,
194 nostr,
195 active_workloads: Arc::new(Mutex::new(HashMap::new())),
196 stats: Arc::new(Mutex::new(ProviderStats {
197 total_jobs_completed: 0,
198 uptime_start: now,
199 })),
200 })
201 }
202
203 pub fn get_npub(&self) -> String {
205 self.nostr.get_service_public_key()
206 }
207
208 pub async fn run(&self) -> Result<()> {
210 info!("🚀 Starting Paygress Provider Service");
211 info!("Provider: {}", self.config.provider_name);
212 info!("NPUB: {}", self.get_npub());
213
214 self.publish_offer().await?;
216
217 tokio::select! {
219 result = self.heartbeat_loop() => {
220 error!("Heartbeat loop exited: {:?}", result);
221 result
222 }
223 result = self.listen_for_requests() => {
224 error!("Request listener exited: {:?}", result);
225 result
226 }
227 result = self.cleanup_loop() => {
228 error!("Cleanup loop exited: {:?}", result);
229 result
230 }
231 }
232 }
233
234 async fn publish_offer(&self) -> Result<()> {
236 let stats = self.stats.lock().await;
237
238 let offer = ProviderOfferContent {
239 provider_npub: self.get_npub(),
240 hostname: self.config.provider_name.clone(),
241 location: self.config.provider_location.clone(),
242 capabilities: self.config.capabilities.clone(),
243 specs: self.config.specs.clone(),
244 whitelisted_mints: self.config.whitelisted_mints.clone(),
245 uptime_percent: 100.0, total_jobs_completed: stats.total_jobs_completed,
247 api_endpoint: None, };
249
250 self.nostr.publish_provider_offer(offer).await?;
251 Ok(())
252 }
253
254 async fn heartbeat_loop(&self) -> Result<()> {
256 let interval = tokio::time::Duration::from_secs(self.config.heartbeat_interval_secs);
257
258 loop {
259 if let Err(e) = self.send_heartbeat().await {
260 warn!("Failed to send heartbeat: {}", e);
261 }
262 tokio::time::sleep(interval).await;
263 }
264 }
265
266 async fn send_heartbeat(&self) -> Result<()> {
268 let workloads = self.active_workloads.lock().await;
269
270 let capacity = match self.backend.get_node_status().await {
272 Ok(status) => CapacityInfo {
273 cpu_available: ((1.0 - status.cpu_usage) * 100000.0) as u64, memory_mb_available: status.memory_total.saturating_sub(status.memory_used) / (1024 * 1024),
275 storage_gb_available: status.disk_total.saturating_sub(status.disk_used) / (1024 * 1024 * 1024),
276 },
277 Err(e) => {
278 warn!("Failed to get node status: {}", e);
279 CapacityInfo {
280 cpu_available: 0,
281 memory_mb_available: 0,
282 storage_gb_available: 0,
283 }
284 }
285 };
286
287 let now = std::time::SystemTime::now()
288 .duration_since(std::time::UNIX_EPOCH)?
289 .as_secs();
290
291 let heartbeat = HeartbeatContent {
292 provider_npub: self.get_npub(),
293 timestamp: now,
294 active_workloads: workloads.len() as u32,
295 available_capacity: capacity,
296 };
297
298 self.nostr.publish_heartbeat(heartbeat).await?;
299 Ok(())
300 }
301
302 async fn listen_for_requests(&self) -> Result<()> {
304 info!("Listening for Paygress requests...");
305
306 let backend = self.backend.clone();
308 let config = self.config.clone();
309 let nostr = self.nostr.clone();
310 let workloads = self.active_workloads.clone();
311 let stats = self.stats.clone();
312
313 self.nostr.subscribe_to_pod_events(move |event| {
314 let backend = backend.clone();
315 let config = config.clone();
316 let nostr = nostr.clone();
317 let workloads = workloads.clone();
318 let stats = stats.clone();
319
320 Box::pin(async move {
321 let my_pubkey = nostr.public_key().to_hex();
322 if event.pubkey == my_pubkey {
323 return Ok(());
324 }
325
326 debug!("Handler received event kind: {}, from: {}, message_type: {}", event.kind, event.pubkey, event.message_type);
327
328 let request_type = match parse_private_message_content(&event.content) {
330 Ok(req) => req,
331 Err(e) => {
332 warn!("Failed to parse request from {}: {}", event.pubkey, e);
333 let error = ErrorResponseContent {
334 error_type: "invalid_request".to_string(),
335 message: "Failed to parse request".to_string(),
336 details: Some(e.to_string()),
337 };
338 let _ = nostr.send_error_response_private_message(
339 &event.pubkey,
340 error,
341 &event.message_type,
342 ).await;
343 return Ok(());
344 }
345 };
346
347 debug!("Successfully parsed request metadata");
348
349 match request_type {
351 PrivateRequest::Spawn(spawn_req) => {
352 if let Err(e) = handle_spawn_request(
353 backend.as_ref(),
354 &config,
355 &nostr,
356 &workloads,
357 &stats,
358 &event.pubkey,
359 &event.message_type,
360 spawn_req,
361 ).await {
362 error!("Failed to handle spawn request: {}", e);
363 }
364 }
365 PrivateRequest::Status(status_req) => {
366 if let Err(e) = handle_status_request(
367 backend.as_ref(),
368 &config,
369 &nostr,
370 &workloads,
371 &event.pubkey,
372 &event.message_type,
373 status_req,
374 ).await {
375 error!("Failed to handle status request: {}", e);
376 }
377 }
378 PrivateRequest::TopUp(_) => {
379 warn!("TopUp request received but not yet fully implemented");
380 let _ = nostr.send_error_response(
381 &event.pubkey,
382 "not_implemented",
383 "TopUp is not yet implemented on this provider",
384 None,
385 &event.message_type,
386 ).await;
387 }
388 }
389
390 Ok(())
391 })
392 }).await?;
393
394 Ok(())
395 }
396
397 async fn cleanup_loop(&self) -> Result<()> {
399 let interval = tokio::time::Duration::from_secs(30);
400
401 loop {
402 tokio::time::sleep(interval).await;
403
404 let now = std::time::SystemTime::now()
405 .duration_since(std::time::UNIX_EPOCH)?
406 .as_secs();
407
408 let mut workloads = self.active_workloads.lock().await;
409 let expired: Vec<u32> = workloads
410 .iter()
411 .filter(|(_, w)| w.expires_at <= now)
412 .map(|(vmid, _)| *vmid)
413 .collect();
414
415 for vmid in expired {
416 info!("Cleaning up expired workload: {}", vmid);
417
418 if let Some(_workload) = workloads.remove(&vmid) {
419 let stop_result = self.backend.stop_container(vmid).await;
420 let result = match stop_result {
421 Ok(_) => self.backend.delete_container(vmid).await,
422 Err(e) => Err(e),
423 };
424
425 match result {
426 Ok(_) => {
427 info!("Cleaned up workload {}", vmid);
428 let mut stats = self.stats.lock().await;
429 stats.total_jobs_completed += 1;
430 }
431 Err(e) => error!("Failed to cleanup workload {}: {}", vmid, e),
432 }
433 }
434 }
435 }
436 }
437}
438
439async fn handle_spawn_request(
443 backend: &dyn ComputeBackend,
444 config: &ProviderConfig,
445 nostr: &NostrRelaySubscriber,
446 workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
447 stats: &Arc<Mutex<ProviderStats>>,
448 requester_pubkey: &str,
449 message_type: &str,
450 request: EncryptedSpawnPodRequest,
451) -> Result<()> {
452 info!("Processing spawn request from {} (tier: {:?})", requester_pubkey, request.pod_spec_id);
453
454 let payment_msats = match crate::cashu::extract_token_value(&request.cashu_token).await {
456 Ok(v) => v,
457 Err(e) => {
458 let err_msg = format!("Invalid Cashu token: {}", e);
459 error!("{}", err_msg);
460 nostr.send_error_response(
461 requester_pubkey,
462 "invalid_token",
463 &err_msg,
464 None,
465 message_type,
466 ).await?;
467 return Ok(());
468 }
469 };
470
471 let spec = match config.specs.iter().find(|s| Some(s.id.clone()) == request.pod_spec_id) {
473 Some(s) => s,
474 None => {
475 if let Some(s) = config.specs.first() {
477 s
478 } else {
479 let err_msg = "No pod specifications available on this provider";
480 error!("{}", err_msg);
481 nostr.send_error_response(
482 requester_pubkey,
483 "no_specs",
484 err_msg,
485 None,
486 message_type,
487 ).await?;
488 return Ok(());
489 }
490 }
491 };
492
493 let duration_secs = payment_msats / spec.rate_msats_per_sec;
495 if duration_secs < config.minimum_duration_seconds {
496 let err_msg = format!(
497 "Insufficient payment for minimum duration. Required: {} msats for {}s",
498 config.minimum_duration_seconds * spec.rate_msats_per_sec,
499 config.minimum_duration_seconds
500 );
501 warn!("{}", err_msg);
502 nostr.send_error_response(
503 requester_pubkey,
504 "insufficient_payment",
505 &err_msg,
506 None,
507 message_type,
508 ).await?;
509 return Ok(());
510 }
511
512 info!("Validated payment: {} msats for {}s on tier {}", payment_msats, duration_secs, spec.name);
513
514 let id = match backend.find_available_id(
516 config.vmid_range_start,
517 config.vmid_range_end,
518 ).await {
519 Ok(id) => id,
520 Err(e) => {
521 let err_msg = format!("Failed to find available ID: {}", e);
522 error!("{}", err_msg);
523 nostr.send_error_response(
524 requester_pubkey,
525 "provisioning_error",
526 &err_msg,
527 None,
528 message_type,
529 ).await?;
530 return Ok(());
531 }
532 };
533
534 let password = crate::sidecar_service::SidecarState::generate_password();
536
537 let host_port = match config.ssh_port_start {
539 Some(start) => start + (id - config.vmid_range_start) as u16,
540 None => 30000 + (id % 10000) as u16,
541 };
542
543 let container_config = ContainerConfig {
545 id,
546 name: format!("paygress-{}", id),
547 image: request.pod_image.clone(),
548 cpu_cores: (spec.cpu_millicores / 1000).max(1) as u32,
549 memory_mb: spec.memory_mb as u32,
550 storage_gb: 10, password: password.clone(),
552 ssh_key: None,
553 host_port: Some(host_port),
554 };
555
556 debug!("Calling backend.create_container for workload {}", id);
557 if let Err(e) = backend.create_container(&container_config).await {
558 let err_msg = format!("Backend failed to create workload: {}", e);
559 error!("{}", err_msg);
560 nostr.send_error_response(
561 requester_pubkey,
562 "backend_error",
563 &err_msg,
564 None,
565 message_type,
566 ).await?;
567 return Ok(());
568 }
569 debug!("Successfully created container {}", id);
570
571 let now = std::time::SystemTime::now()
572 .duration_since(std::time::UNIX_EPOCH)?
573 .as_secs();
574
575 let workload = WorkloadInfo {
577 vmid: id,
578 workload_type: "lxc".to_string(), spec_id: spec.id.clone(),
580 created_at: now,
581 expires_at: now + duration_secs,
582 owner_npub: requester_pubkey.to_string(),
583 };
584
585 workloads.lock().await.insert(id, workload.clone());
586
587 {
589 let mut s = stats.lock().await;
590 s.total_jobs_completed += 1;
591 }
592
593 let host = &config.public_ip;
596
597 let expires_dt = chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
599 let details = AccessDetailsContent {
600 pod_npub: format!("container-{}", id),
601 node_port: host_port,
602 expires_at: expires_dt.to_rfc3339(),
603 cpu_millicores: spec.cpu_millicores,
604 memory_mb: spec.memory_mb,
605 pod_spec_name: spec.name.clone(),
606 pod_spec_description: spec.description.clone(),
607 instructions: vec![
608 format!("🚀 Workload provisioned successfully!"),
609 format!("👤 Username: root"),
610 format!("🔑 Password: {}", password),
611 format!("⌛ Expires: {}", expires_dt.format("%Y-%m-%d %H:%M:%S UTC")),
612 format!("Access: You can connect to the container using SSH."),
613 format!(" ssh -p {} root@{}", host_port, host),
614 ],
615 };
616
617 debug!("Sending access details to {}", requester_pubkey);
618 nostr.send_access_details_private_message(
619 requester_pubkey,
620 details,
621 message_type,
622 ).await?;
623
624 debug!("Access details sent successfully");
625
626 info!("Workload {} provisioned for {} seconds", id, duration_secs);
627 Ok(())
628}
629
630async fn handle_status_request(
632 backend: &dyn ComputeBackend,
633 config: &ProviderConfig,
634 nostr: &NostrRelaySubscriber,
635 workloads: &Arc<Mutex<HashMap<u32, WorkloadInfo>>>,
636 requester_pubkey: &str,
637 message_type: &str,
638 request: StatusRequestContent,
639) -> Result<()> {
640 info!("Processing status request for pod {} from {}", request.pod_id, requester_pubkey);
641
642 let vmid = request.pod_id.parse::<u32>().ok();
644
645 let workload = {
646 let lock = workloads.lock().await;
647 if let Some(vmid) = vmid {
648 lock.get(&vmid).cloned()
649 } else {
650 lock.values().find(|w| w.owner_npub == request.pod_id || w.owner_npub == requester_pubkey).cloned()
652 }
653 };
654
655 let workload = match workload {
656 Some(w) => w,
657 None => {
658 let err_msg = format!("Workload {} not found or you don't have access", request.pod_id);
659 warn!("{}", err_msg);
660 nostr.send_error_response(
661 requester_pubkey,
662 "not_found",
663 &err_msg,
664 None,
665 message_type,
666 ).await?;
667 return Ok(());
668 }
669 };
670
671 let status_info = match backend.get_node_status().await {
673 Ok(s) => s,
674 Err(_) => crate::compute::NodeStatus { cpu_usage: 0.0, memory_used: 0, memory_total: 0, disk_used: 0, disk_total: 0 },
675 };
676
677 let now = std::time::SystemTime::now()
679 .duration_since(std::time::UNIX_EPOCH)?
680 .as_secs();
681
682 let time_remaining = workload.expires_at.saturating_sub(now);
683 let status = if time_remaining == 0 { "Expired" } else { "Running" };
684
685 let expires_dt = chrono::DateTime::from_timestamp(workload.expires_at as i64, 0).unwrap_or_default();
686
687 let spec = config.specs.iter()
689 .find(|s| s.id == workload.spec_id);
690 let cpu = spec.map(|s| s.cpu_millicores).unwrap_or(1000);
691 let mem = spec.map(|s| s.memory_mb).unwrap_or(1024);
692 let host_port = match config.ssh_port_start {
693 Some(start) => start + (workload.vmid - config.vmid_range_start) as u16,
694 None => (30000 + (workload.vmid % 10000)) as u16,
695 };
696
697 let response = StatusResponseContent {
698 pod_id: workload.vmid.to_string(),
699 status: status.to_string(),
700 expires_at: expires_dt.to_rfc3339(),
701 time_remaining_seconds: time_remaining,
702 cpu_millicores: cpu,
703 memory_mb: mem,
704 ssh_host: config.public_ip.clone(),
705 ssh_port: host_port,
706 ssh_username: "root".to_string(),
707 };
708
709 nostr.send_status_response(
710 requester_pubkey,
711 response,
712 message_type,
713 ).await?;
714
715 info!("Status response sent for workload {}", workload.vmid);
716 Ok(())
717}
718
719pub fn load_config(path: &str) -> Result<ProviderConfig> {
721 let content = std::fs::read_to_string(path)
722 .context(format!("Failed to read config file: {}", path))?;
723
724 serde_json::from_str(&content)
725 .context("Failed to parse provider config")
726}
727
728pub fn save_config(path: &str, config: &ProviderConfig) -> Result<()> {
730 let content = serde_json::to_string_pretty(config)?;
731 std::fs::write(path, content)
732 .context(format!("Failed to write config file: {}", path))?;
733 Ok(())
734}