1use anyhow::{Context, Result};
6use reqwest::{Client, header};
7use serde::{Deserialize, Serialize};
8use tracing::{info, warn, error};
9
10pub struct ProxmoxClient {
12 client: Client,
13 base_url: String,
14 auth_header: String,
15 node: String,
16}
17
18#[derive(Debug, Clone, Serialize)]
20pub struct LxcConfig {
21 pub vmid: u32,
22 pub hostname: String,
23 pub ostemplate: String,
24 pub storage: String,
25 pub rootfs: String,
26 pub memory: u32,
27 pub cores: u32,
28 pub net0: String,
29 pub password: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub ssh_public_keys: Option<String>,
32 #[serde(default = "default_true")]
33 pub start: bool,
34 #[serde(default = "default_true")]
35 pub unprivileged: bool,
36}
37
38fn default_true() -> bool {
39 true
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct VmConfig {
45 pub vmid: u32,
46 pub name: String,
47 pub memory: u32,
48 pub cores: u32,
49 pub sockets: u32,
50 pub ide2: String, pub scsi0: String, pub net0: String,
53 pub ostype: String,
54 #[serde(default = "default_true")]
55 pub start: bool,
56}
57
58#[derive(Debug, Clone, Deserialize)]
60pub struct NodeStatus {
61 pub cpu: f64,
62 pub memory: MemoryInfo,
63 pub uptime: u64,
64 #[serde(default)]
65 pub loadavg: Vec<f64>,
66}
67
68#[derive(Debug, Clone, Deserialize)]
69pub struct MemoryInfo {
70 pub total: u64,
71 pub used: u64,
72 pub free: u64,
73}
74
75#[derive(Debug, Clone, Deserialize)]
77pub struct WorkloadStatus {
78 pub vmid: u32,
79 pub status: String,
80 pub name: String,
81 #[serde(default)]
82 pub uptime: u64,
83 #[serde(default)]
84 pub cpu: f64,
85 #[serde(default)]
86 pub mem: u64,
87 #[serde(default)]
88 pub maxmem: u64,
89}
90
91#[derive(Debug, Deserialize)]
93struct ProxmoxResponse<T> {
94 data: Option<T>,
95 #[serde(default)]
96 errors: Option<serde_json::Value>,
97}
98
99#[derive(Debug, Deserialize)]
101struct TaskResponse {
102 data: String, }
104
105impl ProxmoxClient {
106 pub fn new(api_url: &str, token_id: &str, token_secret: &str, node: &str) -> Result<Self> {
114 let client = Client::builder()
116 .danger_accept_invalid_certs(true) .build()
118 .context("Failed to create HTTP client")?;
119
120 let auth_header = format!("PVEAPIToken={}={}", token_id, token_secret);
121
122 Ok(Self {
123 client,
124 base_url: api_url.trim_end_matches('/').to_string(),
125 auth_header,
126 node: node.to_string(),
127 })
128 }
129
130 fn node_url(&self) -> String {
132 format!("{}/nodes/{}", self.base_url, self.node)
133 }
134
135 pub async fn create_lxc(&self, config: &LxcConfig) -> Result<String> {
139 let url = format!("{}/lxc", self.node_url());
140
141 info!("Creating LXC container {} on node {}", config.vmid, self.node);
142
143 let response = self.client
144 .post(&url)
145 .header(header::AUTHORIZATION, &self.auth_header)
146 .form(config)
147 .send()
148 .await
149 .context("Failed to send create LXC request")?;
150
151 if !response.status().is_success() {
152 let status = response.status();
153 let body = response.text().await.unwrap_or_default();
154 error!("Failed to create LXC: {} - {}", status, body);
155 anyhow::bail!("Proxmox API error: {} - {}", status, body);
156 }
157
158 let task: TaskResponse = response.json().await
159 .context("Failed to parse create LXC response")?;
160
161 info!("LXC creation task started: {}", task.data);
162 Ok(task.data)
163 }
164
165 pub async fn start_lxc(&self, vmid: u32) -> Result<String> {
167 let url = format!("{}/lxc/{}/status/start", self.node_url(), vmid);
168
169 info!("Starting LXC container {}", vmid);
170
171 let response = self.client
172 .post(&url)
173 .header(header::AUTHORIZATION, &self.auth_header)
174 .send()
175 .await
176 .context("Failed to send start LXC request")?;
177
178 if !response.status().is_success() {
179 let status = response.status();
180 let body = response.text().await.unwrap_or_default();
181 anyhow::bail!("Failed to start LXC {}: {} - {}", vmid, status, body);
182 }
183
184 let task: TaskResponse = response.json().await
185 .context("Failed to parse start LXC response")?;
186
187 Ok(task.data)
188 }
189
190 pub async fn stop_lxc(&self, vmid: u32) -> Result<String> {
192 let url = format!("{}/lxc/{}/status/stop", self.node_url(), vmid);
193
194 info!("Stopping LXC container {}", vmid);
195
196 let response = self.client
197 .post(&url)
198 .header(header::AUTHORIZATION, &self.auth_header)
199 .send()
200 .await
201 .context("Failed to send stop LXC request")?;
202
203 if !response.status().is_success() {
204 let status = response.status();
205 let body = response.text().await.unwrap_or_default();
206 anyhow::bail!("Failed to stop LXC {}: {} - {}", vmid, status, body);
207 }
208
209 let task: TaskResponse = response.json().await
210 .context("Failed to parse stop LXC response")?;
211
212 Ok(task.data)
213 }
214
215 pub async fn delete_lxc(&self, vmid: u32) -> Result<String> {
217 let url = format!("{}/lxc/{}", self.node_url(), vmid);
218
219 info!("Deleting LXC container {}", vmid);
220
221 let response = self.client
222 .delete(&url)
223 .header(header::AUTHORIZATION, &self.auth_header)
224 .send()
225 .await
226 .context("Failed to send delete LXC request")?;
227
228 if !response.status().is_success() {
229 let status = response.status();
230 let body = response.text().await.unwrap_or_default();
231 anyhow::bail!("Failed to delete LXC {}: {} - {}", vmid, status, body);
232 }
233
234 let task: TaskResponse = response.json().await
235 .context("Failed to parse delete LXC response")?;
236
237 Ok(task.data)
238 }
239
240 pub async fn get_lxc_status(&self, vmid: u32) -> Result<WorkloadStatus> {
242 let url = format!("{}/lxc/{}/status/current", self.node_url(), vmid);
243
244 let response = self.client
245 .get(&url)
246 .header(header::AUTHORIZATION, &self.auth_header)
247 .send()
248 .await
249 .context("Failed to get LXC status")?;
250
251 if !response.status().is_success() {
252 let status = response.status();
253 let body = response.text().await.unwrap_or_default();
254 anyhow::bail!("Failed to get LXC {} status: {} - {}", vmid, status, body);
255 }
256
257 let resp: ProxmoxResponse<WorkloadStatus> = response.json().await
258 .context("Failed to parse LXC status response")?;
259
260 resp.data.context("No status data returned")
261 }
262
263 pub async fn list_lxc(&self) -> Result<Vec<WorkloadStatus>> {
265 let url = format!("{}/lxc", self.node_url());
266
267 let response = self.client
268 .get(&url)
269 .header(header::AUTHORIZATION, &self.auth_header)
270 .send()
271 .await
272 .context("Failed to list LXC containers")?;
273
274 if !response.status().is_success() {
275 let status = response.status();
276 let body = response.text().await.unwrap_or_default();
277 anyhow::bail!("Failed to list LXC containers: {} - {}", status, body);
278 }
279
280 let resp: ProxmoxResponse<Vec<WorkloadStatus>> = response.json().await
281 .context("Failed to parse LXC list response")?;
282
283 Ok(resp.data.unwrap_or_default())
284 }
285
286 pub async fn create_vm(&self, config: &VmConfig) -> Result<String> {
290 let url = format!("{}/qemu", self.node_url());
291
292 info!("Creating VM {} on node {}", config.vmid, self.node);
293
294 let response = self.client
295 .post(&url)
296 .header(header::AUTHORIZATION, &self.auth_header)
297 .form(config)
298 .send()
299 .await
300 .context("Failed to send create VM request")?;
301
302 if !response.status().is_success() {
303 let status = response.status();
304 let body = response.text().await.unwrap_or_default();
305 error!("Failed to create VM: {} - {}", status, body);
306 anyhow::bail!("Proxmox API error: {} - {}", status, body);
307 }
308
309 let task: TaskResponse = response.json().await
310 .context("Failed to parse create VM response")?;
311
312 info!("VM creation task started: {}", task.data);
313 Ok(task.data)
314 }
315
316 pub async fn start_vm(&self, vmid: u32) -> Result<String> {
318 let url = format!("{}/qemu/{}/status/start", self.node_url(), vmid);
319
320 info!("Starting VM {}", vmid);
321
322 let response = self.client
323 .post(&url)
324 .header(header::AUTHORIZATION, &self.auth_header)
325 .send()
326 .await
327 .context("Failed to send start VM request")?;
328
329 if !response.status().is_success() {
330 let status = response.status();
331 let body = response.text().await.unwrap_or_default();
332 anyhow::bail!("Failed to start VM {}: {} - {}", vmid, status, body);
333 }
334
335 let task: TaskResponse = response.json().await
336 .context("Failed to parse start VM response")?;
337
338 Ok(task.data)
339 }
340
341 pub async fn stop_vm(&self, vmid: u32) -> Result<String> {
343 let url = format!("{}/qemu/{}/status/stop", self.node_url(), vmid);
344
345 info!("Stopping VM {}", vmid);
346
347 let response = self.client
348 .post(&url)
349 .header(header::AUTHORIZATION, &self.auth_header)
350 .send()
351 .await
352 .context("Failed to send stop VM request")?;
353
354 if !response.status().is_success() {
355 let status = response.status();
356 let body = response.text().await.unwrap_or_default();
357 anyhow::bail!("Failed to stop VM {}: {} - {}", vmid, status, body);
358 }
359
360 let task: TaskResponse = response.json().await
361 .context("Failed to parse stop VM response")?;
362
363 Ok(task.data)
364 }
365
366 pub async fn delete_vm(&self, vmid: u32) -> Result<String> {
368 let url = format!("{}/qemu/{}", self.node_url(), vmid);
369
370 info!("Deleting VM {}", vmid);
371
372 let response = self.client
373 .delete(&url)
374 .header(header::AUTHORIZATION, &self.auth_header)
375 .send()
376 .await
377 .context("Failed to send delete VM request")?;
378
379 if !response.status().is_success() {
380 let status = response.status();
381 let body = response.text().await.unwrap_or_default();
382 anyhow::bail!("Failed to delete VM {}: {} - {}", vmid, status, body);
383 }
384
385 let task: TaskResponse = response.json().await
386 .context("Failed to parse delete VM response")?;
387
388 Ok(task.data)
389 }
390
391 pub async fn get_vm_status(&self, vmid: u32) -> Result<WorkloadStatus> {
393 let url = format!("{}/qemu/{}/status/current", self.node_url(), vmid);
394
395 let response = self.client
396 .get(&url)
397 .header(header::AUTHORIZATION, &self.auth_header)
398 .send()
399 .await
400 .context("Failed to get VM status")?;
401
402 if !response.status().is_success() {
403 let status = response.status();
404 let body = response.text().await.unwrap_or_default();
405 anyhow::bail!("Failed to get VM {} status: {} - {}", vmid, status, body);
406 }
407
408 let resp: ProxmoxResponse<WorkloadStatus> = response.json().await
409 .context("Failed to parse VM status response")?;
410
411 resp.data.context("No status data returned")
412 }
413
414 pub async fn list_vm(&self) -> Result<Vec<WorkloadStatus>> {
416 let url = format!("{}/qemu", self.node_url());
417
418 let response = self.client
419 .get(&url)
420 .header(header::AUTHORIZATION, &self.auth_header)
421 .send()
422 .await
423 .context("Failed to list VMs")?;
424
425 if !response.status().is_success() {
426 let status = response.status();
427 let body = response.text().await.unwrap_or_default();
428 anyhow::bail!("Failed to list VMs: {} - {}", status, body);
429 }
430
431 let resp: ProxmoxResponse<Vec<WorkloadStatus>> = response.json().await
432 .context("Failed to parse VM list response")?;
433
434 Ok(resp.data.unwrap_or_default())
435 }
436
437 pub async fn get_node_status(&self) -> Result<NodeStatus> {
441 let url = format!("{}/status", self.node_url());
442
443 let response = self.client
444 .get(&url)
445 .header(header::AUTHORIZATION, &self.auth_header)
446 .send()
447 .await
448 .context("Failed to get node status")?;
449
450 if !response.status().is_success() {
451 let status = response.status();
452 let body = response.text().await.unwrap_or_default();
453 anyhow::bail!("Failed to get node status: {} - {}", status, body);
454 }
455
456 let resp: ProxmoxResponse<NodeStatus> = response.json().await
457 .context("Failed to parse node status response")?;
458
459 resp.data.context("No node status data returned")
460 }
461
462 pub async fn find_available_vmid(&self, range_start: u32, range_end: u32) -> Result<u32> {
464 let lxc_list = self.list_lxc().await?;
465 let vm_list = self.list_vm().await?;
466
467 let used_ids: std::collections::HashSet<u32> = lxc_list.iter()
468 .chain(vm_list.iter())
469 .map(|w| w.vmid)
470 .collect();
471
472 for vmid in range_start..=range_end {
473 if !used_ids.contains(&vmid) {
474 return Ok(vmid);
475 }
476 }
477
478 anyhow::bail!("No available VMID in range {}-{}", range_start, range_end)
479 }
480
481 pub async fn wait_for_task(&self, upid: &str, timeout_secs: u64) -> Result<()> {
483 use tokio::time::{sleep, Duration};
484
485 let start = std::time::Instant::now();
486 let timeout = Duration::from_secs(timeout_secs);
487
488 loop {
489 if start.elapsed() > timeout {
490 anyhow::bail!("Task {} timed out after {} seconds", upid, timeout_secs);
491 }
492
493 let url = format!("{}/tasks/{}/status", self.node_url(), upid);
494
495 let response = self.client
496 .get(&url)
497 .header(header::AUTHORIZATION, &self.auth_header)
498 .send()
499 .await?;
500
501 if response.status().is_success() {
502 #[derive(Deserialize)]
503 struct TaskStatus {
504 status: String,
505 #[serde(default)]
506 exitstatus: Option<String>,
507 }
508
509 let resp: ProxmoxResponse<TaskStatus> = response.json().await?;
510
511 if let Some(task) = resp.data {
512 if task.status == "stopped" {
513 if let Some(exit) = task.exitstatus {
514 if exit == "OK" {
515 return Ok(());
516 } else {
517 anyhow::bail!("Task failed with: {}", exit);
518 }
519 }
520 return Ok(());
521 }
522 }
523 }
524
525 sleep(Duration::from_secs(2)).await;
526 }
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533
534 #[test]
535 fn test_lxc_config_serialization() {
536 let config = LxcConfig {
537 vmid: 100,
538 hostname: "test-container".to_string(),
539 ostemplate: "local:vztmpl/ubuntu-22.04-standard.tar.zst".to_string(),
540 storage: "local-lvm".to_string(),
541 rootfs: "local-lvm:8".to_string(),
542 memory: 1024,
543 cores: 1,
544 net0: "name=eth0,bridge=vmbr0,ip=dhcp".to_string(),
545 password: "testpass".to_string(),
546 ssh_public_keys: None,
547 start: true,
548 unprivileged: true,
549 };
550
551 let _serialized = serde_urlencoded::to_string(&config).unwrap();
553 }
554}
555
556use async_trait::async_trait;
559use crate::compute::{ComputeBackend, ContainerConfig, NodeStatus as ComputeNodeStatus};
560
561pub struct ProxmoxBackend {
563 client: ProxmoxClient,
564 storage: String,
565 bridge: String,
566 template: String,
567}
568
569impl ProxmoxBackend {
570 pub fn new(client: ProxmoxClient, storage: &str, bridge: &str, template: &str) -> Self {
571 Self {
572 client,
573 storage: storage.to_string(),
574 bridge: bridge.to_string(),
575 template: template.to_string(),
576 }
577 }
578}
579
580#[async_trait]
581impl ComputeBackend for ProxmoxBackend {
582 async fn find_available_id(&self, range_start: u32, range_end: u32) -> Result<u32> {
583 self.client.find_available_vmid(range_start, range_end).await
584 }
585
586 async fn create_container(&self, config: &ContainerConfig) -> Result<String> {
587 let lxc = LxcConfig {
592 vmid: config.id,
593 hostname: config.name.clone(),
594 ostemplate: self.template.clone(),
595 storage: self.storage.clone(),
596 rootfs: format!("{}:8", self.storage),
597 memory: config.memory_mb,
598 cores: config.cpu_cores,
599 net0: format!("name=eth0,bridge={},ip=dhcp", self.bridge),
600 password: config.password.clone(),
601 ssh_public_keys: config.ssh_key.clone(),
602 start: true,
603 unprivileged: true,
604 };
605
606 let task = self.client.create_lxc(&lxc).await?;
607 self.client.wait_for_task(&task, 120).await?;
608 Ok(config.id.to_string())
609 }
610
611 async fn start_container(&self, id: u32) -> Result<()> {
612 let task = self.client.start_lxc(id).await?;
613 self.client.wait_for_task(&task, 60).await?;
614 Ok(())
615 }
616
617 async fn stop_container(&self, id: u32) -> Result<()> {
618 let task = self.client.stop_lxc(id).await?;
619 self.client.wait_for_task(&task, 60).await?;
620 Ok(())
621 }
622
623 async fn delete_container(&self, id: u32) -> Result<()> {
624 let task = self.client.delete_lxc(id).await?;
625 self.client.wait_for_task(&task, 60).await?;
626 Ok(())
627 }
628
629 async fn get_node_status(&self) -> Result<ComputeNodeStatus> {
630 let status = self.client.get_node_status().await?;
631 Ok(ComputeNodeStatus {
632 cpu_usage: status.cpu,
633 memory_used: status.memory.used,
634 memory_total: status.memory.total,
635 disk_used: 0,
636 disk_total: 0,
637 })
638 }
639
640 async fn get_container_ip(&self, _id: u32) -> Result<Option<String>> {
641 Ok(None)
643 }
644}