1use serde::Serialize;
2use std::sync::Arc;
3use tracing::{info, warn};
4use std::collections::{HashMap, BTreeMap, HashSet};
5use chrono::{DateTime, Utc};
6
7use std::sync::Mutex;
9
10use crate::nostr;
11use crate::cashu::initialize_cashu;
12
13#[derive(Clone, Debug)]
15pub struct SidecarConfig {
16 pub cashu_db_path: String,
17 pub pod_namespace: String,
18 pub minimum_pod_duration_seconds: u64, pub base_image: String, pub ssh_host: String, pub ssh_port_range_start: u16, pub ssh_port_range_end: u16, pub enable_cleanup_task: bool,
24 pub whitelisted_mints: Vec<String>, pub pod_specs: Vec<nostr::PodSpec>, }
27
28impl Default for SidecarConfig {
29 fn default() -> Self {
30 Self {
31 cashu_db_path: "./cashu.db".to_string(),
32 pod_namespace: "user-workloads".to_string(),
33 minimum_pod_duration_seconds: 60, base_image: "linuxserver/openssh-server:latest".to_string(),
35 ssh_host: "localhost".to_string(),
36 ssh_port_range_start: 30000,
37 ssh_port_range_end: 31000,
38 enable_cleanup_task: true,
39 whitelisted_mints: vec![], pod_specs: vec![], }
42 }
43}
44
45#[derive(Debug)]
47pub struct PortPool {
48 available_ports: HashSet<u16>,
49 allocated_ports: HashSet<u16>,
50 range_start: u16,
51 range_end: u16,
52}
53
54impl PortPool {
55 pub fn new(range_start: u16, range_end: u16) -> Self {
56 let mut available_ports = HashSet::new();
57 for port in range_start..=range_end {
58 available_ports.insert(port);
59 }
60
61 Self {
62 available_ports,
63 allocated_ports: HashSet::new(),
64 range_start,
65 range_end,
66 }
67 }
68
69
70 pub fn available_count(&self) -> usize {
71 self.available_ports.len()
72 }
73
74 pub fn allocated_count(&self) -> usize {
75 self.allocated_ports.len()
76 }
77}
78
79#[derive(Clone)]
81pub struct SidecarState {
82 pub config: SidecarConfig,
83 pub k8s_client: Arc<PodManager>,
84 pub active_pods: Arc<tokio::sync::RwLock<HashMap<String, PodInfo>>>,
85 pub port_pool: Arc<Mutex<PortPool>>,
86}
87
88#[derive(Clone, Debug, Serialize)]
90pub struct PodInfo {
91 pub pod_npub: String, pub namespace: String,
93 pub created_at: DateTime<Utc>,
94 pub expires_at: DateTime<Utc>,
95 pub allocated_port: u16, pub ssh_username: String,
97 pub ssh_password: String,
98 pub payment_amount_msats: u64,
99 pub duration_seconds: u64,
100 pub node_port: Option<u16>,
101 pub nostr_public_key: String, pub nostr_private_key: String, }
104
105
106
107pub struct PodManager {
109 pub client: kube::Client,
110}
111
112impl PodManager {
113 pub async fn new() -> Result<Self, String> {
114 let client = kube::Client::try_default().await.map_err(|e| format!("Failed to create Kubernetes client: {}", e))?;
115 Ok(Self { client })
116 }
117
118 pub async fn create_ssh_pod(
119 &self,
120 _config: &SidecarConfig,
121 namespace: &str,
122 pod_name: &str,
123 pod_npub: &str, pod_nsec: &str, image: &str,
126 ssh_port: u16,
127 username: &str,
128 password: &str,
129 duration_seconds: u64,
130 memory_mb: u64,
131 cpu_millicores: u64,
132 user_pubkey: &str, ) -> Result<u16, String> { use k8s_openapi::api::core::v1::{
135 Container, Pod, PodSpec, EnvVar, ContainerPort, Volume,
136 };
137 use kube::api::PostParams;
138 use kube::Api;
139
140 let pods: Api<Pod> = Api::namespaced(self.client.clone(), namespace);
141
142 let env_vars = vec![
144 EnvVar {
145 name: "PUID".to_string(),
146 value: Some("1000".to_string()),
147 value_from: None,
148 },
149 EnvVar {
150 name: "PGID".to_string(),
151 value: Some("1000".to_string()),
152 value_from: None,
153 },
154 EnvVar {
155 name: "TZ".to_string(),
156 value: Some("Etc/UTC".to_string()),
157 value_from: None,
158 },
159 EnvVar {
160 name: "PUBLIC_KEY_FILE".to_string(),
161 value: Some("/config/.ssh/authorized_keys".to_string()),
162 value_from: None,
163 },
164 EnvVar {
165 name: "USER_NAME".to_string(),
166 value: Some(username.to_string()),
167 value_from: None,
168 },
169 EnvVar {
170 name: "USER_PASSWORD".to_string(),
171 value: Some(password.to_string()),
172 value_from: None,
173 },
174 EnvVar {
175 name: "PASSWORD_ACCESS".to_string(),
176 value: Some("true".to_string()),
177 value_from: None,
178 },
179 EnvVar {
181 name: "POD_NPUB".to_string(),
182 value: Some(pod_npub.to_string()),
183 value_from: None,
184 },
185 EnvVar {
186 name: "POD_NSEC".to_string(),
187 value: Some(pod_nsec.to_string()),
188 value_from: None,
189 },
190 EnvVar {
191 name: "USER_PUBKEY".to_string(),
192 value: Some(user_pubkey.to_string()),
193 value_from: None,
194 },
195 EnvVar {
196 name: "NOSTR_RELAYS".to_string(),
197 value: Some("wss://relay.damus.io,wss://nos.lol,wss://relay.nostr.band".to_string()),
198 value_from: None,
199 },
200 EnvVar {
201 name: "SSH_PORT".to_string(),
202 value: Some(ssh_port.to_string()), value_from: None,
204 },
205 ];
206
207 let mut labels = BTreeMap::new();
209 labels.insert("app".to_string(), "paygress-ssh-pod".to_string());
210 labels.insert("managed-by".to_string(), "paygress-sidecar".to_string());
211 labels.insert("pod-type".to_string(), "ssh-access".to_string());
212 labels.insert("pod-name".to_string(), pod_name.to_string());
213 let npub_hex = if let Some(stripped) = pod_npub.strip_prefix("npub1") {
215 stripped } else {
217 pod_npub };
219 let truncated_hex = if npub_hex.len() > 63 {
221 &npub_hex[..63]
222 } else {
223 npub_hex
224 };
225 labels.insert("pod-npub".to_string(), truncated_hex.to_string()); let mut annotations = BTreeMap::new();
228 annotations.insert("paygress.io/created-at".to_string(), Utc::now().to_rfc3339());
229 annotations.insert("paygress.io/expires-at".to_string(),
230 (Utc::now() + chrono::Duration::seconds(duration_seconds as i64)).to_rfc3339());
231 annotations.insert("paygress.io/duration-seconds".to_string(), duration_seconds.to_string());
232 annotations.insert("paygress.io/ssh-username".to_string(), username.to_string());
233 let _volumes: Vec<Volume> = Vec::new();
237
238 let containers = vec![Container {
240 name: "ssh-server".to_string(),
241 image: Some(image.to_string()),
242 ports: Some(vec![ContainerPort {
243 container_port: ssh_port as i32, host_port: None, name: Some("ssh".to_string()),
246 protocol: Some("TCP".to_string()),
247 ..Default::default()
248 }]),
249 env: Some(env_vars),
250 image_pull_policy: Some("IfNotPresent".to_string()),
251 command: Some(vec![
254 "/bin/sh".to_string(),
255 "-c".to_string(),
256 format!(
257 r#"set -e
258echo "Setting up SSH access on port {ssh_port}..."
259
260# Detect package manager and install OpenSSH if not present
261if command -v apt-get >/dev/null 2>&1; then
262 export DEBIAN_FRONTEND=noninteractive
263 apt-get update -qq && apt-get install -y -qq openssh-server sudo 2>/dev/null || true
264 mkdir -p /run/sshd
265elif command -v apk >/dev/null 2>&1; then
266 apk add --no-cache openssh sudo 2>/dev/null || true
267 ssh-keygen -A 2>/dev/null || true
268elif command -v yum >/dev/null 2>&1; then
269 yum install -y openssh-server sudo 2>/dev/null || true
270fi
271
272# Detect available shell
273if [ -f /bin/bash ]; then
274 DEFAULT_SHELL="/bin/bash"
275else
276 DEFAULT_SHELL="/bin/sh"
277fi
278
279# Create user if it doesn't exist
280if ! id "{username}" >/dev/null 2>&1; then
281 useradd -m -s "$DEFAULT_SHELL" "{username}" 2>/dev/null || adduser -D -s "$DEFAULT_SHELL" "{username}" 2>/dev/null || true
282fi
283
284# Set password
285echo "{username}:{password}" | chpasswd 2>/dev/null || true
286
287# Add user to sudoers
288echo "{username} ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/{username} 2>/dev/null || true
289chmod 0440 /etc/sudoers.d/{username} 2>/dev/null || true
290
291# Configure SSH
292mkdir -p /etc/ssh
293cat > /etc/ssh/sshd_config <<EOF
294Port {ssh_port}
295ListenAddress 0.0.0.0
296PermitRootLogin yes
297PasswordAuthentication yes
298PubkeyAuthentication yes
299UseDNS no
300X11Forwarding yes
301PrintMotd no
302AcceptEnv LANG LC_*
303Subsystem sftp internal-sftp
304EOF
305
306# Start SSH daemon
307if command -v sshd >/dev/null 2>&1; then
308 # Use absolute path to sshd if possible
309 SSHD_BIN=$(command -v sshd)
310 $SSHD_BIN -f /etc/ssh/sshd_config -D &
311 echo "SSH server started on port {ssh_port}"
312else
313 echo "Warning: sshd not found"
314fi
315
316echo "Container ready. SSH on port {ssh_port}, user: {username} (shell: $DEFAULT_SHELL)"
317tail -f /dev/null
318"#,
319 ssh_port = ssh_port,
320 username = username,
321 password = password
322 ),
323 ]),
324 resources: Some(k8s_openapi::api::core::v1::ResourceRequirements {
325 limits: Some({
326 let mut limits = std::collections::BTreeMap::new();
327 limits.insert("memory".to_string(), k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!("{}Mi", memory_mb)));
328 limits.insert("cpu".to_string(), k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!("{}m", cpu_millicores)));
329 limits
330 }),
331 requests: Some({
332 let mut requests = std::collections::BTreeMap::new();
333 requests.insert("memory".to_string(), k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!("{}Mi", memory_mb)));
334 requests.insert("cpu".to_string(), k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!("{}m", cpu_millicores)));
335 requests
336 }),
337 claims: None,
338 }),
339 ..Default::default()
340 }];
341
342
343 let pod = Pod {
345 metadata: kube::core::ObjectMeta {
346 name: Some(pod_name.to_string()),
347 labels: Some(labels.clone()),
348 annotations: Some(annotations),
349 ..Default::default()
350 },
351 spec: Some(PodSpec {
352 containers,
353 volumes: None,
354 restart_policy: Some("Never".to_string()),
355 active_deadline_seconds: Some(duration_seconds as i64), host_network: Some(true), dns_policy: Some("ClusterFirstWithHostNet".to_string()), ..Default::default()
359 }),
360 ..Default::default()
361 };
362
363 let pp = PostParams::default();
365 pods.create(&pp, &pod).await.map_err(|e| format!("Failed to create pod: {}", e))?;
366
367 info!("Waiting for pod {} to be ready...", pod_name);
373 let mut attempts = 0;
374 let max_attempts = 30; let mut pod_ready = false;
376
377 while attempts < max_attempts {
378 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
379 attempts += 1;
380
381 match pods.get(pod_name).await {
382 Ok(p) => {
383 if let Some(status) = &p.status {
384 if let Some(phase) = &status.phase {
385 if phase == "Running" {
386 if let Some(container_statuses) = &status.container_statuses {
388 if let Some(container_status) = container_statuses.first() {
389 if container_status.ready {
390 use std::net::TcpStream;
392 match TcpStream::connect_timeout(
393 &std::net::SocketAddr::new(
394 std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
395 ssh_port
396 ),
397 std::time::Duration::from_secs(1)
398 ) {
399 Ok(_) => {
400 pod_ready = true;
401 info!("Pod {} is ready and SSH is listening on port {}", pod_name, ssh_port);
402 break;
403 }
404 Err(_) => {
405 if attempts % 5 == 0 {
406 info!("Pod {} is running but SSH not yet listening on port {} (attempt {}/{})", pod_name, ssh_port, attempts, max_attempts);
407 }
408 }
409 }
410 }
411 }
412 }
413 } else if phase == "Failed" || phase == "Succeeded" {
414 return Err(format!("Pod {} entered {} state", pod_name, phase));
415 }
416 }
417 }
418 }
419 Err(e) => {
420 if attempts % 5 == 0 {
421 warn!("Failed to get pod status: {} (attempt {}/{})", e, attempts, max_attempts);
422 }
423 }
424 }
425 }
426
427 if !pod_ready {
428 warn!("Pod {} may not be fully ready, but proceeding anyway", pod_name);
429 }
430
431 let node_port = ssh_port; info!(
435 pod_name = %pod_name,
436 namespace = %namespace,
437 duration_seconds = %duration_seconds,
438 ssh_port = %ssh_port,
439 username = %username,
440 node_port = %node_port,
441 "SSH pod created with direct host port access (no service overhead)"
442 );
443
444 Ok(node_port)
448 }
449
450
451
452
453
454
455
456
457 pub async fn extend_pod_deadline(&self, namespace: &str, pod_name: &str, additional_duration_seconds: u64) -> Result<(), String> {
458 use kube::api::{Patch, PatchParams};
459 use kube::Api;
460 use k8s_openapi::api::core::v1::Pod;
461 use serde_json::json;
462
463 let pods: Api<Pod> = Api::namespaced(self.client.clone(), namespace);
464
465 let current_pod = pods.get(pod_name).await.map_err(|e| format!("Failed to get pod: {}", e))?;
467
468 let current_deadline_seconds = current_pod
470 .spec
471 .as_ref()
472 .and_then(|spec| spec.active_deadline_seconds)
473 .unwrap_or(0);
474
475 let new_deadline_seconds = current_deadline_seconds + additional_duration_seconds as i64;
476 let new_expires_at = Utc::now() + chrono::Duration::seconds(additional_duration_seconds as i64);
477
478 let patch = json!({
480 "spec": {
481 "activeDeadlineSeconds": new_deadline_seconds
482 },
483 "metadata": {
484 "annotations": {
485 "paygress.io/expires-at": new_expires_at.to_rfc3339(),
486 "paygress.io/extended-at": Utc::now().to_rfc3339()
487 }
488 }
489 });
490
491 let pp = PatchParams::default();
492 let _ = pods.patch(pod_name, &pp, &Patch::Merge(patch)).await
493 .map_err(|e| format!("Failed to update pod deadline: {}", e))?;
494
495 info!(
496 pod_name = %pod_name,
497 namespace = %namespace,
498 additional_seconds = %additional_duration_seconds,
499 new_deadline_seconds = %new_deadline_seconds,
500 "Extended pod activeDeadlineSeconds"
501 );
502
503 Ok(())
504 }
505}
506
507impl SidecarState {
508 pub async fn new(config: SidecarConfig) -> Result<Self, String> {
509 initialize_cashu(&config.cashu_db_path).await
511 .map_err(|e| format!("Cashu init failed: {}", e))?;
512
513 let k8s_client = Arc::new(PodManager::new().await?);
515
516 let active_pods = Arc::new(tokio::sync::RwLock::new(HashMap::new()));
517
518 let port_pool = Arc::new(Mutex::new(PortPool::new(
520 config.ssh_port_range_start,
521 config.ssh_port_range_end,
522 )));
523
524 Ok(Self {
525 config,
526 k8s_client,
527 active_pods,
528 port_pool,
529 })
530 }
531
532 pub fn calculate_duration_from_payment(&self, payment_msats: u64) -> u64 {
534 let msats_per_sec = self.config.pod_specs.first()
535 .map(|spec| spec.rate_msats_per_sec)
536 .unwrap_or(100) .max(1);
538
539 payment_msats / msats_per_sec
541 }
542
543 pub fn generate_password() -> String {
545 use rand::Rng;
546 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
547 let mut rng = rand::thread_rng();
548 (0..16)
549 .map(|_| {
550 let idx = rng.gen_range(0..CHARSET.len());
551 CHARSET[idx] as char
552 })
553 .collect()
554 }
555
556
557
558
559
560
561
562 pub fn is_port_in_use(&self, port: u16) -> bool {
564 use std::net::{TcpListener, SocketAddr};
566 let addr = SocketAddr::from(([0, 0, 0, 0], port));
567 match TcpListener::bind(addr) {
568 Ok(_listener) => {
569 false
571 },
572 Err(_) => {
573 true
575 }
576 }
577 }
578
579
580
581 async fn get_ports_in_use_by_pods(&self) -> Result<HashSet<u16>, String> {
583 use kube::Api;
584 use k8s_openapi::api::core::v1::Pod;
585
586 let pods_api: Api<Pod> = Api::namespaced(self.k8s_client.client.clone(), &self.config.pod_namespace);
587 let pods = pods_api.list(&kube::api::ListParams::default()).await
588 .map_err(|e| format!("Failed to list pods: {}", e))?;
589
590 let mut used_ports = HashSet::new();
591 for pod in pods.items {
592 if let Some(spec) = &pod.spec {
593 for container in &spec.containers {
594 if let Some(ports) = &container.ports {
595 for port in ports {
596 if let Some(host_port) = port.host_port {
597 used_ports.insert(host_port as u16);
598 }
599 }
600 }
601 }
602 }
603 }
604
605 info!("Found {} ports in use by existing pods: {:?}", used_ports.len(), used_ports);
606 Ok(used_ports)
607 }
608
609 pub async fn generate_ssh_port(&self) -> Result<u16, String> {
611 let pods_using_ports = self.get_ports_in_use_by_pods().await
613 .unwrap_or_else(|e| {
614 warn!("Failed to get ports in use by pods: {}", e);
615 HashSet::new()
616 });
617
618 let allocated_ports: Vec<u16> = {
620 let port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
621 port_pool.allocated_ports.iter().cloned().collect()
622 };
623
624 for port in allocated_ports {
626 if !self.is_port_in_use(port) && !pods_using_ports.contains(&port) {
627 let mut port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
629 port_pool.allocated_ports.remove(&port);
630 port_pool.available_ports.insert(port);
631 info!("Port {} was marked allocated but is actually free - moving back to available", port);
632 }
633 }
634
635 let available_ports: Vec<u16> = {
637 let port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
638 port_pool.available_ports.iter().cloned().collect()
639 };
640
641 for port in available_ports {
642 if !self.is_port_in_use(port) && !pods_using_ports.contains(&port) {
644 let delay_ms = (port % 10) + 1;
646 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms as u64)).await;
647
648 if !self.is_port_in_use(port) && !pods_using_ports.contains(&port) {
650 let mut port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
652 port_pool.available_ports.remove(&port);
653 port_pool.allocated_ports.insert(port);
654
655 info!("✅ Allocated unique SSH port {} from pool ({} available, {} allocated)",
656 port, port_pool.available_count(), port_pool.allocated_count());
657 return Ok(port);
658 }
659 }
660
661 let mut port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
663 port_pool.available_ports.remove(&port);
664 if pods_using_ports.contains(&port) {
665 warn!("Port {} is in use by existing pod - removed from pool", port);
666 } else {
667 warn!("Port {} is in use by system - removed from pool", port);
668 }
669 }
670
671 let start_port = self.config.ssh_port_range_start;
673 let end_port = self.config.ssh_port_range_end;
674
675 info!("No ports available in pool, searching range {}-{}", start_port, end_port);
676
677 for port in start_port..=end_port {
678 if pods_using_ports.contains(&port) {
680 continue;
681 }
682
683 let is_allocated = {
685 let port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
686 port_pool.allocated_ports.contains(&port)
687 };
688
689 if is_allocated {
690 continue;
691 }
692
693 if !self.is_port_in_use(port) {
695 let mut port_pool = self.port_pool.lock().map_err(|e| format!("Failed to lock port pool: {}", e))?;
697 port_pool.allocated_ports.insert(port);
698
699 info!("✅ Allocated unique SSH port {} from range ({} available, {} allocated)",
700 port, port_pool.available_count(), port_pool.allocated_count());
701 return Ok(port);
702 }
703 }
704
705 Err("No available ports in the configured range".to_string())
706 }
707
708
709
710
711}
712
713pub async fn extract_token_value(token: &str) -> Result<u64, String> {
715 crate::cashu::extract_token_value(token).await
716 .map_err(|e| e.to_string())
717}
718
719