1use anyhow::{Context, Result};
6use reqwest::{header, Client};
7use serde::{Deserialize, Serialize};
8use tracing::{error, info};
9
10pub struct ProxmoxClient {
12 client: Client,
13 base_url: String,
14 auth_header: String,
15 node: String,
16}
17
18#[derive(Debug, Clone, Serialize)]
20pub struct LxcConfig {
21 pub vmid: u32,
22 pub hostname: String,
23 pub ostemplate: String,
24 pub storage: String,
25 pub rootfs: String,
26 pub memory: u32,
27 pub cores: u32,
28 pub net0: String,
29 pub password: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub ssh_public_keys: Option<String>,
32 #[serde(default = "default_true")]
33 pub start: bool,
34 #[serde(default = "default_true")]
35 pub unprivileged: bool,
36}
37
38fn default_true() -> bool {
39 true
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct VmConfig {
45 pub vmid: u32,
46 pub name: String,
47 pub memory: u32,
48 pub cores: u32,
49 pub sockets: u32,
50 pub ide2: String, pub scsi0: String, pub net0: String,
53 pub ostype: String,
54 #[serde(default = "default_true")]
55 pub start: bool,
56}
57
58#[derive(Debug, Clone, Deserialize)]
60pub struct NodeStatus {
61 pub cpu: f64,
62 pub memory: MemoryInfo,
63 pub uptime: u64,
64 #[serde(default)]
65 pub loadavg: Vec<f64>,
66}
67
68#[derive(Debug, Clone, Deserialize)]
69pub struct MemoryInfo {
70 pub total: u64,
71 pub used: u64,
72 pub free: u64,
73}
74
75#[derive(Debug, Clone, Deserialize)]
77pub struct WorkloadStatus {
78 pub vmid: u32,
79 pub status: String,
80 pub name: String,
81 #[serde(default)]
82 pub uptime: u64,
83 #[serde(default)]
84 pub cpu: f64,
85 #[serde(default)]
86 pub mem: u64,
87 #[serde(default)]
88 pub maxmem: u64,
89}
90
91#[derive(Debug, Deserialize)]
93struct ProxmoxResponse<T> {
94 data: Option<T>,
95 #[serde(default)]
96 errors: Option<serde_json::Value>,
97}
98
99#[derive(Debug, Deserialize)]
101struct TaskResponse {
102 data: String, }
104
105impl ProxmoxClient {
106 pub fn new(api_url: &str, token_id: &str, token_secret: &str, node: &str) -> Result<Self> {
114 let client = Client::builder()
116 .danger_accept_invalid_certs(true) .build()
118 .context("Failed to create HTTP client")?;
119
120 let auth_header = format!("PVEAPIToken={}={}", token_id, token_secret);
121
122 Ok(Self {
123 client,
124 base_url: api_url.trim_end_matches('/').to_string(),
125 auth_header,
126 node: node.to_string(),
127 })
128 }
129
130 fn node_url(&self) -> String {
132 format!("{}/nodes/{}", self.base_url, self.node)
133 }
134
135 pub async fn create_lxc(&self, config: &LxcConfig) -> Result<String> {
139 let url = format!("{}/lxc", self.node_url());
140
141 info!(
142 "Creating LXC container {} on node {}",
143 config.vmid, self.node
144 );
145
146 let response = self
147 .client
148 .post(&url)
149 .header(header::AUTHORIZATION, &self.auth_header)
150 .form(config)
151 .send()
152 .await
153 .context("Failed to send create LXC request")?;
154
155 if !response.status().is_success() {
156 let status = response.status();
157 let body = response.text().await.unwrap_or_default();
158 error!("Failed to create LXC: {} - {}", status, body);
159 anyhow::bail!("Proxmox API error: {} - {}", status, body);
160 }
161
162 let task: TaskResponse = response
163 .json()
164 .await
165 .context("Failed to parse create LXC response")?;
166
167 info!("LXC creation task started: {}", task.data);
168 Ok(task.data)
169 }
170
171 pub async fn start_lxc(&self, vmid: u32) -> Result<String> {
173 let url = format!("{}/lxc/{}/status/start", self.node_url(), vmid);
174
175 info!("Starting LXC container {}", vmid);
176
177 let response = self
178 .client
179 .post(&url)
180 .header(header::AUTHORIZATION, &self.auth_header)
181 .send()
182 .await
183 .context("Failed to send start LXC request")?;
184
185 if !response.status().is_success() {
186 let status = response.status();
187 let body = response.text().await.unwrap_or_default();
188 anyhow::bail!("Failed to start LXC {}: {} - {}", vmid, status, body);
189 }
190
191 let task: TaskResponse = response
192 .json()
193 .await
194 .context("Failed to parse start LXC response")?;
195
196 Ok(task.data)
197 }
198
199 pub async fn stop_lxc(&self, vmid: u32) -> Result<String> {
201 let url = format!("{}/lxc/{}/status/stop", self.node_url(), vmid);
202
203 info!("Stopping LXC container {}", vmid);
204
205 let response = self
206 .client
207 .post(&url)
208 .header(header::AUTHORIZATION, &self.auth_header)
209 .send()
210 .await
211 .context("Failed to send stop LXC request")?;
212
213 if !response.status().is_success() {
214 let status = response.status();
215 let body = response.text().await.unwrap_or_default();
216 anyhow::bail!("Failed to stop LXC {}: {} - {}", vmid, status, body);
217 }
218
219 let task: TaskResponse = response
220 .json()
221 .await
222 .context("Failed to parse stop LXC response")?;
223
224 Ok(task.data)
225 }
226
227 pub async fn delete_lxc(&self, vmid: u32) -> Result<String> {
229 let url = format!("{}/lxc/{}", self.node_url(), vmid);
230
231 info!("Deleting LXC container {}", vmid);
232
233 let response = self
234 .client
235 .delete(&url)
236 .header(header::AUTHORIZATION, &self.auth_header)
237 .send()
238 .await
239 .context("Failed to send delete LXC request")?;
240
241 if !response.status().is_success() {
242 let status = response.status();
243 let body = response.text().await.unwrap_or_default();
244 anyhow::bail!("Failed to delete LXC {}: {} - {}", vmid, status, body);
245 }
246
247 let task: TaskResponse = response
248 .json()
249 .await
250 .context("Failed to parse delete LXC response")?;
251
252 Ok(task.data)
253 }
254
255 pub async fn get_lxc_status(&self, vmid: u32) -> Result<WorkloadStatus> {
257 let url = format!("{}/lxc/{}/status/current", self.node_url(), vmid);
258
259 let response = self
260 .client
261 .get(&url)
262 .header(header::AUTHORIZATION, &self.auth_header)
263 .send()
264 .await
265 .context("Failed to get LXC status")?;
266
267 if !response.status().is_success() {
268 let status = response.status();
269 let body = response.text().await.unwrap_or_default();
270 anyhow::bail!("Failed to get LXC {} status: {} - {}", vmid, status, body);
271 }
272
273 let resp: ProxmoxResponse<WorkloadStatus> = response
274 .json()
275 .await
276 .context("Failed to parse LXC status response")?;
277
278 resp.data.context("No status data returned")
279 }
280
281 pub async fn list_lxc(&self) -> Result<Vec<WorkloadStatus>> {
283 let url = format!("{}/lxc", self.node_url());
284
285 let response = self
286 .client
287 .get(&url)
288 .header(header::AUTHORIZATION, &self.auth_header)
289 .send()
290 .await
291 .context("Failed to list LXC containers")?;
292
293 if !response.status().is_success() {
294 let status = response.status();
295 let body = response.text().await.unwrap_or_default();
296 anyhow::bail!("Failed to list LXC containers: {} - {}", status, body);
297 }
298
299 let resp: ProxmoxResponse<Vec<WorkloadStatus>> = response
300 .json()
301 .await
302 .context("Failed to parse LXC list response")?;
303
304 Ok(resp.data.unwrap_or_default())
305 }
306
307 pub async fn create_vm(&self, config: &VmConfig) -> Result<String> {
311 let url = format!("{}/qemu", self.node_url());
312
313 info!("Creating VM {} on node {}", config.vmid, self.node);
314
315 let response = self
316 .client
317 .post(&url)
318 .header(header::AUTHORIZATION, &self.auth_header)
319 .form(config)
320 .send()
321 .await
322 .context("Failed to send create VM request")?;
323
324 if !response.status().is_success() {
325 let status = response.status();
326 let body = response.text().await.unwrap_or_default();
327 error!("Failed to create VM: {} - {}", status, body);
328 anyhow::bail!("Proxmox API error: {} - {}", status, body);
329 }
330
331 let task: TaskResponse = response
332 .json()
333 .await
334 .context("Failed to parse create VM response")?;
335
336 info!("VM creation task started: {}", task.data);
337 Ok(task.data)
338 }
339
340 pub async fn start_vm(&self, vmid: u32) -> Result<String> {
342 let url = format!("{}/qemu/{}/status/start", self.node_url(), vmid);
343
344 info!("Starting VM {}", vmid);
345
346 let response = self
347 .client
348 .post(&url)
349 .header(header::AUTHORIZATION, &self.auth_header)
350 .send()
351 .await
352 .context("Failed to send start VM request")?;
353
354 if !response.status().is_success() {
355 let status = response.status();
356 let body = response.text().await.unwrap_or_default();
357 anyhow::bail!("Failed to start VM {}: {} - {}", vmid, status, body);
358 }
359
360 let task: TaskResponse = response
361 .json()
362 .await
363 .context("Failed to parse start VM response")?;
364
365 Ok(task.data)
366 }
367
368 pub async fn stop_vm(&self, vmid: u32) -> Result<String> {
370 let url = format!("{}/qemu/{}/status/stop", self.node_url(), vmid);
371
372 info!("Stopping VM {}", vmid);
373
374 let response = self
375 .client
376 .post(&url)
377 .header(header::AUTHORIZATION, &self.auth_header)
378 .send()
379 .await
380 .context("Failed to send stop VM request")?;
381
382 if !response.status().is_success() {
383 let status = response.status();
384 let body = response.text().await.unwrap_or_default();
385 anyhow::bail!("Failed to stop VM {}: {} - {}", vmid, status, body);
386 }
387
388 let task: TaskResponse = response
389 .json()
390 .await
391 .context("Failed to parse stop VM response")?;
392
393 Ok(task.data)
394 }
395
396 pub async fn delete_vm(&self, vmid: u32) -> Result<String> {
398 let url = format!("{}/qemu/{}", self.node_url(), vmid);
399
400 info!("Deleting VM {}", vmid);
401
402 let response = self
403 .client
404 .delete(&url)
405 .header(header::AUTHORIZATION, &self.auth_header)
406 .send()
407 .await
408 .context("Failed to send delete VM request")?;
409
410 if !response.status().is_success() {
411 let status = response.status();
412 let body = response.text().await.unwrap_or_default();
413 anyhow::bail!("Failed to delete VM {}: {} - {}", vmid, status, body);
414 }
415
416 let task: TaskResponse = response
417 .json()
418 .await
419 .context("Failed to parse delete VM response")?;
420
421 Ok(task.data)
422 }
423
424 pub async fn get_vm_status(&self, vmid: u32) -> Result<WorkloadStatus> {
426 let url = format!("{}/qemu/{}/status/current", self.node_url(), vmid);
427
428 let response = self
429 .client
430 .get(&url)
431 .header(header::AUTHORIZATION, &self.auth_header)
432 .send()
433 .await
434 .context("Failed to get VM status")?;
435
436 if !response.status().is_success() {
437 let status = response.status();
438 let body = response.text().await.unwrap_or_default();
439 anyhow::bail!("Failed to get VM {} status: {} - {}", vmid, status, body);
440 }
441
442 let resp: ProxmoxResponse<WorkloadStatus> = response
443 .json()
444 .await
445 .context("Failed to parse VM status response")?;
446
447 resp.data.context("No status data returned")
448 }
449
450 pub async fn list_vm(&self) -> Result<Vec<WorkloadStatus>> {
452 let url = format!("{}/qemu", self.node_url());
453
454 let response = self
455 .client
456 .get(&url)
457 .header(header::AUTHORIZATION, &self.auth_header)
458 .send()
459 .await
460 .context("Failed to list VMs")?;
461
462 if !response.status().is_success() {
463 let status = response.status();
464 let body = response.text().await.unwrap_or_default();
465 anyhow::bail!("Failed to list VMs: {} - {}", status, body);
466 }
467
468 let resp: ProxmoxResponse<Vec<WorkloadStatus>> = response
469 .json()
470 .await
471 .context("Failed to parse VM list response")?;
472
473 Ok(resp.data.unwrap_or_default())
474 }
475
476 pub async fn get_node_status(&self) -> Result<NodeStatus> {
480 let url = format!("{}/status", self.node_url());
481
482 let response = self
483 .client
484 .get(&url)
485 .header(header::AUTHORIZATION, &self.auth_header)
486 .send()
487 .await
488 .context("Failed to get node status")?;
489
490 if !response.status().is_success() {
491 let status = response.status();
492 let body = response.text().await.unwrap_or_default();
493 anyhow::bail!("Failed to get node status: {} - {}", status, body);
494 }
495
496 let resp: ProxmoxResponse<NodeStatus> = response
497 .json()
498 .await
499 .context("Failed to parse node status response")?;
500
501 resp.data.context("No node status data returned")
502 }
503
504 pub async fn find_available_vmid(&self, range_start: u32, range_end: u32) -> Result<u32> {
506 let lxc_list = self.list_lxc().await?;
507 let vm_list = self.list_vm().await?;
508
509 let used_ids: std::collections::HashSet<u32> = lxc_list
510 .iter()
511 .chain(vm_list.iter())
512 .map(|w| w.vmid)
513 .collect();
514
515 for vmid in range_start..=range_end {
516 if !used_ids.contains(&vmid) {
517 return Ok(vmid);
518 }
519 }
520
521 anyhow::bail!("No available VMID in range {}-{}", range_start, range_end)
522 }
523
524 pub async fn wait_for_task(&self, upid: &str, timeout_secs: u64) -> Result<()> {
526 use tokio::time::{sleep, Duration};
527
528 let start = std::time::Instant::now();
529 let timeout = Duration::from_secs(timeout_secs);
530
531 loop {
532 if start.elapsed() > timeout {
533 anyhow::bail!("Task {} timed out after {} seconds", upid, timeout_secs);
534 }
535
536 let url = format!("{}/tasks/{}/status", self.node_url(), upid);
537
538 let response = self
539 .client
540 .get(&url)
541 .header(header::AUTHORIZATION, &self.auth_header)
542 .send()
543 .await?;
544
545 if response.status().is_success() {
546 #[derive(Deserialize)]
547 struct TaskStatus {
548 status: String,
549 #[serde(default)]
550 exitstatus: Option<String>,
551 }
552
553 let resp: ProxmoxResponse<TaskStatus> = response.json().await?;
554
555 if let Some(task) = resp.data {
556 if task.status == "stopped" {
557 if let Some(exit) = task.exitstatus {
558 if exit == "OK" {
559 return Ok(());
560 } else {
561 anyhow::bail!("Task failed with: {}", exit);
562 }
563 }
564 return Ok(());
565 }
566 }
567 }
568
569 sleep(Duration::from_secs(2)).await;
570 }
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 #[test]
579 fn test_lxc_config_serialization() {
580 let config = LxcConfig {
581 vmid: 100,
582 hostname: "test-container".to_string(),
583 ostemplate: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
584 storage: "local-lvm".to_string(),
585 rootfs: "local-lvm:8".to_string(),
586 memory: 1024,
587 cores: 1,
588 net0: "name=eth0,bridge=vmbr0,ip=dhcp".to_string(),
589 password: "testpass".to_string(),
590 ssh_public_keys: None,
591 start: true,
592 unprivileged: true,
593 };
594
595 let _serialized = serde_urlencoded::to_string(&config).unwrap();
597 }
598}
599
600use crate::compute::{ComputeBackend, ContainerConfig, NodeStatus as ComputeNodeStatus};
603use async_trait::async_trait;
604
605pub struct ProxmoxBackend {
607 client: ProxmoxClient,
608 storage: String,
609 bridge: String,
610 template: String,
611}
612
613impl ProxmoxBackend {
614 pub fn new(client: ProxmoxClient, storage: &str, bridge: &str, template: &str) -> Self {
615 Self {
616 client,
617 storage: storage.to_string(),
618 bridge: bridge.to_string(),
619 template: template.to_string(),
620 }
621 }
622}
623
624#[async_trait]
625impl ComputeBackend for ProxmoxBackend {
626 async fn find_available_id(&self, range_start: u32, range_end: u32) -> Result<u32> {
627 self.client
628 .find_available_vmid(range_start, range_end)
629 .await
630 }
631
632 async fn create_container(&self, config: &ContainerConfig) -> Result<String> {
633 let lxc = LxcConfig {
638 vmid: config.id,
639 hostname: config.name.clone(),
640 ostemplate: self.template.clone(),
641 storage: self.storage.clone(),
642 rootfs: format!("{}:8", self.storage),
643 memory: config.memory_mb,
644 cores: config.cpu_cores,
645 net0: format!("name=eth0,bridge={},ip=dhcp", self.bridge),
646 password: config.password.clone(),
647 ssh_public_keys: config.ssh_key.clone(),
648 start: true,
649 unprivileged: true,
650 };
651
652 let task = self.client.create_lxc(&lxc).await?;
653 self.client.wait_for_task(&task, 120).await?;
654 Ok(config.id.to_string())
655 }
656
657 async fn start_container(&self, id: u32) -> Result<()> {
658 let task = self.client.start_lxc(id).await?;
659 self.client.wait_for_task(&task, 60).await?;
660 Ok(())
661 }
662
663 async fn stop_container(&self, id: u32) -> Result<()> {
664 let task = self.client.stop_lxc(id).await?;
665 self.client.wait_for_task(&task, 60).await?;
666 Ok(())
667 }
668
669 async fn delete_container(&self, id: u32) -> Result<()> {
670 let task = self.client.delete_lxc(id).await?;
671 self.client.wait_for_task(&task, 60).await?;
672 Ok(())
673 }
674
675 async fn get_node_status(&self) -> Result<ComputeNodeStatus> {
676 let status = self.client.get_node_status().await?;
677 Ok(ComputeNodeStatus {
678 cpu_usage: status.cpu,
679 memory_used: status.memory.used,
680 memory_total: status.memory.total,
681 disk_used: 0,
682 disk_total: 0,
683 })
684 }
685
686 async fn get_container_ip(&self, _id: u32) -> Result<Option<String>> {
687 Ok(None)
689 }
690}