lmrc_k3s/
lib.rs

1//! # K3s Manager
2//!
3//! A comprehensive Rust library for managing K3s Kubernetes clusters via SSH.
4
5pub mod adapter;
6mod error;
7mod state;
8
9pub use adapter::K3sAdapter;
10pub use error::{K3sError, Result};
11pub use state::{
12    ClusterState, ClusterStatus, DesiredClusterConfig, NodeInfo, NodeStatus, ReconciliationPlan,
13    WorkerConfig, WorkerNode,
14};
15
16use lmrc_ssh::{AuthMethod, SshClient};
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20use tracing::{debug, info};
21
22/// Main K3s cluster manager
23pub struct K3sManager {
24    k3s_version: String,
25    token: String,
26    disable_components: Vec<String>,
27    ssh_username: String,
28    ssh_key_path: Option<String>,
29    ssh_password: Option<String>,
30    state: Arc<RwLock<Option<ClusterState>>>,
31}
32
33impl K3sManager {
34    /// Create a new K3sManager
35    pub fn new(k3s_version: String, token: String, disable_components: Vec<String>) -> Self {
36        Self {
37            k3s_version,
38            token,
39            disable_components,
40            ssh_username: "root".to_string(),
41            ssh_key_path: None,
42            ssh_password: None,
43            state: Arc::new(RwLock::new(None)),
44        }
45    }
46
47    /// Create a new builder
48    pub fn builder() -> K3sManagerBuilder {
49        K3sManagerBuilder::default()
50    }
51
52    /// Execute SSH command
53    pub async fn execute_ssh(&self, host: &str, command: &str) -> Result<String> {
54        debug!("Executing on {}: {}", host, command);
55
56        // Determine authentication method
57        let auth = if let Some(ref password) = self.ssh_password {
58            AuthMethod::Password {
59                username: self.ssh_username.clone(),
60                password: password.clone(),
61            }
62        } else {
63            // Use public key authentication (default to project-local .ssh/id_rsa)
64            let key_path = self.ssh_key_path.clone().unwrap_or_else(|| {
65                std::env::var("SSH_KEY_PATH")
66                    .unwrap_or_else(|_| ".ssh/id_rsa".to_string())
67            });
68
69            AuthMethod::PublicKey {
70                username: self.ssh_username.clone(),
71                private_key_path: key_path,
72                passphrase: None,
73            }
74        };
75
76        // Create SSH client and connect
77        let mut client = SshClient::new(host, 22)
78            .map_err(|e| K3sError::ConnectionError {
79                host: host.to_string(),
80                message: format!("Failed to create SSH client: {}", e),
81            })?
82            .with_auth(auth)
83            .connect()
84            .map_err(|e| K3sError::ConnectionError {
85                host: host.to_string(),
86                message: format!("Failed to connect: {}", e),
87            })?;
88
89        // Execute command
90        let output = client
91            .execute(command)
92            .map_err(|e| K3sError::CommandError {
93                host: host.to_string(),
94                command: command.to_string(),
95                exit_code: None,
96                message: format!("Execution failed: {}", e),
97            })?;
98
99        // Check exit status
100        if output.exit_status != 0 {
101            return Err(K3sError::CommandError {
102                host: host.to_string(),
103                command: command.to_string(),
104                exit_code: Some(output.exit_status),
105                message: format!("stderr: {}", output.stderr),
106            });
107        }
108
109        Ok(output.stdout)
110    }
111
112    /// Check if K3s is installed
113    pub async fn is_installed(&self, server_ip: &str) -> Result<bool> {
114        match self.execute_ssh(server_ip, "which k3s").await {
115            Ok(output) => Ok(!output.trim().is_empty()),
116            Err(_) => Ok(false),
117        }
118    }
119
120    /// Install K3s on master
121    pub async fn install_master(
122        &self,
123        server_ip: &str,
124        private_ip: Option<&str>,
125        force: bool,
126    ) -> Result<()> {
127        info!("Installing K3s on master node {}", server_ip);
128
129        if !force && self.is_installed(server_ip).await? {
130            return Err(K3sError::AlreadyInstalledError {
131                node: server_ip.to_string(),
132            });
133        }
134
135        {
136            let mut state = self.state.write().unwrap();
137            *state = Some(ClusterState::new(
138                server_ip.to_string(),
139                private_ip.map(String::from),
140                self.k3s_version.clone(),
141                self.token.clone(),
142                self.disable_components.clone(),
143            ));
144        }
145
146        let mut install_cmd = if self.k3s_version == "latest" || self.k3s_version.is_empty() {
147            format!(
148                "curl -sfL https://get.k3s.io | K3S_TOKEN={} sh -s - server --cluster-init",
149                self.token
150            )
151        } else {
152            format!(
153                "curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION={} K3S_TOKEN={} sh -s - server --cluster-init",
154                self.k3s_version, self.token
155            )
156        };
157
158        if !self.disable_components.is_empty() {
159            let disable_args = self
160                .disable_components
161                .iter()
162                .map(|c| format!("--disable {}", c))
163                .collect::<Vec<_>>()
164                .join(" ");
165            install_cmd.push_str(&format!(" {}", disable_args));
166        }
167
168        if let Some(priv_ip) = private_ip {
169            install_cmd.push_str(&format!(" --node-ip={} --tls-san={}", priv_ip, priv_ip));
170        }
171
172        self.execute_ssh(server_ip, &install_cmd).await?;
173
174        // Note: Firewall is now configured via Hetzner's native firewall API
175        // during server provisioning, not via UFW. See libs/lmrc-pipeline/src/steps/provision.rs
176
177        self.wait_for_k3s_ready(server_ip, 30, Duration::from_secs(10))
178            .await?;
179
180        info!("K3s master installation complete");
181        Ok(())
182    }
183
184    /// Join worker to cluster
185    pub async fn join_worker(
186        &self,
187        worker_ip: &str,
188        master_ip: &str,
189        worker_private_ip: Option<&str>,
190    ) -> Result<()> {
191        info!("Joining worker {} to cluster", worker_ip);
192
193        // Determine which IP to use for cluster communication
194        // Use master's private IP from cluster state if available, otherwise fall back to public IP
195        let master_cluster_ip = {
196            let state = self.state.read().unwrap();
197            state.as_ref()
198                .and_then(|s| s.master_private_ip.clone())
199                .unwrap_or_else(|| master_ip.to_string())
200        };
201
202        info!("Using master IP {} for cluster communication", master_cluster_ip);
203
204        // Pre-flight checks: Test connectivity from worker to master
205        debug!("Running pre-flight connectivity checks...");
206
207        // Check if worker can reach master on port 6443
208        debug!("Testing K3s API connectivity from worker to master...");
209        let connectivity_test = format!(
210            "timeout 10 nc -zv {} 6443 2>&1 || echo 'CONNECTION_FAILED'",
211            master_cluster_ip
212        );
213        let connectivity_result = self.execute_ssh(worker_ip, &connectivity_test).await?;
214
215        if connectivity_result.contains("CONNECTION_FAILED") || connectivity_result.contains("timed out") {
216            return Err(K3sError::ConnectionError {
217                host: worker_ip.to_string(),
218                message: format!(
219                    "Worker cannot reach master on port 6443. This usually indicates a firewall issue. \
220                     Connectivity test output: {}",
221                    connectivity_result
222                ),
223            });
224        }
225        debug!("K3s API connectivity check passed");
226
227        // Check if master K3s service is actually running
228        debug!("Verifying master K3s service is running...");
229        let service_check = "systemctl is-active k3s 2>&1";
230        let service_result = self.execute_ssh(master_ip, service_check).await?;
231
232        if !service_result.trim().contains("active") {
233            return Err(K3sError::ClusterNotReadyError {
234                attempts: 1,
235                message: format!(
236                    "Master K3s service is not running. Service status: {}",
237                    service_result
238                ),
239            });
240        }
241        debug!("Master K3s service is running");
242
243        {
244            let mut state = self.state.write().unwrap();
245            if let Some(ref mut cluster_state) = *state {
246                cluster_state
247                    .add_worker(worker_ip.to_string(), worker_private_ip.map(String::from));
248            }
249        }
250
251        let mut install_cmd = if self.k3s_version == "latest" || self.k3s_version.is_empty() {
252            format!(
253                "curl -sfL https://get.k3s.io | K3S_URL=https://{}:6443 K3S_TOKEN={} sh -s -",
254                master_cluster_ip, self.token
255            )
256        } else {
257            format!(
258                "curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION={} K3S_URL=https://{}:6443 K3S_TOKEN={} sh -s -",
259                self.k3s_version, master_cluster_ip, self.token
260            )
261        };
262
263        if let Some(priv_ip) = worker_private_ip {
264            install_cmd.push_str(&format!(" --node-ip={}", priv_ip));
265        }
266
267        info!("Running K3s agent installation on worker (this may take 2-5 minutes)...");
268        self.execute_ssh(worker_ip, &install_cmd).await?;
269
270        info!("K3s agent installation completed, waiting for node to be ready...");
271        tokio::time::sleep(Duration::from_secs(5)).await;
272
273        {
274            let mut state = self.state.write().unwrap();
275            if let Some(ref mut cluster_state) = *state {
276                cluster_state.update_worker_status(worker_ip, NodeStatus::Ready);
277                cluster_state.update_cluster_status();
278            }
279        }
280
281        info!("Worker joined successfully");
282        Ok(())
283    }
284
285    /// Download kubeconfig
286    pub async fn download_kubeconfig<P: AsRef<Path>>(
287        &self,
288        master_ip: &str,
289        output_path: P,
290    ) -> Result<()> {
291        info!("Downloading kubeconfig");
292
293        let content = self
294            .execute_ssh(master_ip, "cat /etc/rancher/k3s/k3s.yaml")
295            .await?;
296        let kubeconfig = content.replace("127.0.0.1", master_ip);
297
298        std::fs::write(output_path.as_ref(), kubeconfig).map_err(|e| K3sError::IoError {
299            operation: "write kubeconfig".to_string(),
300            source: e,
301        })?;
302
303        Ok(())
304    }
305
306    /// Get cluster nodes
307    pub async fn get_nodes(&self, master_ip: &str) -> Result<String> {
308        self.execute_ssh(master_ip, "kubectl get nodes").await
309    }
310
311    /// Wait for K3s ready
312    async fn wait_for_k3s_ready(
313        &self,
314        server_ip: &str,
315        max_attempts: u32,
316        wait_duration: Duration,
317    ) -> Result<()> {
318        for attempt in 1..=max_attempts {
319            match self.execute_ssh(server_ip, "kubectl get nodes").await {
320                Ok(_) => return Ok(()),
321                Err(e) => {
322                    if attempt == max_attempts {
323                        return Err(K3sError::ClusterNotReadyError {
324                            attempts: max_attempts,
325                            message: format!("{}", e),
326                        });
327                    }
328                    tokio::time::sleep(wait_duration).await;
329                }
330            }
331        }
332        Ok(())
333    }
334
335    /// Uninstall K3s
336    pub async fn uninstall(&self, server_ip: &str, is_master: bool) -> Result<()> {
337        let script = if is_master {
338            "/usr/local/bin/k3s-uninstall.sh"
339        } else {
340            "/usr/local/bin/k3s-agent-uninstall.sh"
341        };
342
343        let _ = self.execute_ssh(server_ip, script).await;
344        Ok(())
345    }
346
347    /// Get cluster state
348    pub fn get_cluster_state(&self) -> Option<ClusterState> {
349        self.state.read().unwrap().clone()
350    }
351
352    /// Refresh cluster state
353    pub async fn refresh_cluster_state(&self, master_ip: &str) -> Result<()> {
354        let _ = self.get_nodes(master_ip).await?;
355
356        let mut state = self.state.write().unwrap();
357        if let Some(ref mut cluster_state) = *state {
358            cluster_state.update_cluster_status();
359        }
360        Ok(())
361    }
362
363    /// Parse kubectl get nodes output into NodeInfo list
364    pub async fn get_node_info_list(&self, master_ip: &str) -> Result<Vec<NodeInfo>> {
365        let output = self.get_nodes(master_ip).await?;
366
367        let nodes: Vec<NodeInfo> = output
368            .lines()
369            .skip(1) // Skip header line
370            .filter_map(NodeInfo::parse_from_kubectl_line)
371            .collect();
372
373        Ok(nodes)
374    }
375
376    /// Create a reconciliation plan by comparing desired state with current state
377    ///
378    /// This method checks what needs to be done to reach the desired cluster configuration.
379    ///
380    /// # Example
381    ///
382    /// ```rust,no_run
383    /// # use lmrc_k3s::{K3sManager, DesiredClusterConfig, WorkerConfig};
384    /// # async fn example() -> Result<(), lmrc_k3s::K3sError> {
385    /// # let manager = K3sManager::builder().token("token").build();
386    /// let desired = DesiredClusterConfig {
387    ///     master_ip: "192.168.1.10".to_string(),
388    ///     master_private_ip: None,
389    ///     workers: vec![
390    ///         WorkerConfig::new("192.168.1.11".to_string()),
391    ///         WorkerConfig::new("192.168.1.12".to_string()),
392    ///     ],
393    ///     k3s_version: "v1.28.5+k3s1".to_string(),
394    ///     token: "my-token".to_string(),
395    ///     disabled_components: vec![],
396    /// };
397    ///
398    /// let plan = manager.plan_reconciliation(&desired).await?;
399    /// println!("Plan: {}", plan.summary());
400    /// # Ok(())
401    /// # }
402    /// ```
403    pub async fn plan_reconciliation(
404        &self,
405        desired: &DesiredClusterConfig,
406    ) -> Result<ReconciliationPlan> {
407        // Check if master is installed
408        let master_installed = self.is_installed(&desired.master_ip).await?;
409
410        // Get current worker state
411        let current_state = self.get_cluster_state();
412        let current_workers: Vec<String> = current_state
413            .as_ref()
414            .map(|s| s.workers.iter().map(|w| w.ip.clone()).collect())
415            .unwrap_or_default();
416
417        let desired_workers: Vec<String> = desired.workers.iter().map(|w| w.ip.clone()).collect();
418
419        // Workers to add = in desired but not in current
420        let workers_to_add: Vec<WorkerConfig> = desired
421            .workers
422            .iter()
423            .filter(|w| !current_workers.contains(&w.ip))
424            .cloned()
425            .collect();
426
427        // Workers to remove = in current but not in desired
428        let workers_to_remove: Vec<String> = current_workers
429            .into_iter()
430            .filter(|ip| !desired_workers.contains(ip))
431            .collect();
432
433        Ok(ReconciliationPlan {
434            workers_to_add,
435            workers_to_remove,
436            install_master: !master_installed,
437        })
438    }
439
440    /// Apply a reconciliation plan to reach the desired state
441    ///
442    /// This method executes the plan to add/remove nodes as needed.
443    ///
444    /// # Example
445    ///
446    /// ```rust,no_run
447    /// # use lmrc_k3s::{K3sManager, DesiredClusterConfig, WorkerConfig};
448    /// # async fn example() -> Result<(), lmrc_k3s::K3sError> {
449    /// # let manager = K3sManager::builder().token("token").build();
450    /// # let desired = DesiredClusterConfig {
451    /// #     master_ip: "192.168.1.10".to_string(),
452    /// #     master_private_ip: None,
453    /// #     workers: vec![],
454    /// #     k3s_version: "v1.28.5+k3s1".to_string(),
455    /// #     token: "my-token".to_string(),
456    /// #     disabled_components: vec![],
457    /// # };
458    /// let plan = manager.plan_reconciliation(&desired).await?;
459    ///
460    /// if plan.has_changes() {
461    ///     println!("Applying changes: {}", plan.summary());
462    ///     manager.apply_reconciliation(&desired, &plan).await?;
463    /// }
464    /// # Ok(())
465    /// # }
466    /// ```
467    pub async fn apply_reconciliation(
468        &self,
469        desired: &DesiredClusterConfig,
470        plan: &ReconciliationPlan,
471    ) -> Result<()> {
472        // Install master if needed
473        if plan.install_master {
474            info!("Installing master as part of reconciliation");
475            self.install_master(
476                &desired.master_ip,
477                desired.master_private_ip.as_deref(),
478                false,
479            )
480            .await?;
481        }
482
483        // Remove workers first
484        for worker_ip in &plan.workers_to_remove {
485            info!("Removing worker node: {}", worker_ip);
486            self.remove_worker(&desired.master_ip, worker_ip).await?;
487        }
488
489        // Add new workers
490        for worker in &plan.workers_to_add {
491            info!("Adding worker node: {}", worker.ip);
492            self.join_worker(&worker.ip, &desired.master_ip, worker.private_ip.as_deref())
493                .await?;
494        }
495
496        Ok(())
497    }
498
499    /// Reconcile cluster to match desired configuration
500    ///
501    /// This is a convenience method that combines `plan_reconciliation` and `apply_reconciliation`.
502    /// It will automatically add or remove workers to match the desired state.
503    ///
504    /// # Example
505    ///
506    /// ```rust,no_run
507    /// # use lmrc_k3s::{K3sManager, DesiredClusterConfig, WorkerConfig};
508    /// # async fn example() -> Result<(), lmrc_k3s::K3sError> {
509    /// let manager = K3sManager::builder()
510    ///     .token("my-token")
511    ///     .build();
512    ///
513    /// // Define desired state
514    /// let desired = DesiredClusterConfig {
515    ///     master_ip: "192.168.1.10".to_string(),
516    ///     master_private_ip: None,
517    ///     workers: vec![
518    ///         WorkerConfig::new("192.168.1.11".to_string()),
519    ///         WorkerConfig::new("192.168.1.12".to_string()),
520    ///         WorkerConfig::new("192.168.1.13".to_string()), // New worker
521    ///     ],
522    ///     k3s_version: "v1.28.5+k3s1".to_string(),
523    ///     token: "my-token".to_string(),
524    ///     disabled_components: vec!["traefik".to_string()],
525    /// };
526    ///
527    /// // Reconcile - will add worker .13 if not present
528    /// manager.reconcile(&desired).await?;
529    /// # Ok(())
530    /// # }
531    /// ```
532    pub async fn reconcile(&self, desired: &DesiredClusterConfig) -> Result<()> {
533        let plan = self.plan_reconciliation(desired).await?;
534
535        if !plan.has_changes() {
536            info!("Cluster is already in desired state");
537            return Ok(());
538        }
539
540        info!("Reconciliation plan: {}", plan.summary());
541        self.apply_reconciliation(desired, &plan).await?;
542
543        info!("Reconciliation complete");
544        Ok(())
545    }
546
547    /// Remove a worker node from the cluster
548    ///
549    /// This method drains the node, deletes it from the cluster, and uninstalls K3s.
550    ///
551    /// # Arguments
552    ///
553    /// * `master_ip` - The IP of the master node
554    /// * `worker_ip` - The IP of the worker node to remove
555    ///
556    /// # Example
557    ///
558    /// ```rust,no_run
559    /// # use lmrc_k3s::K3sManager;
560    /// # async fn example() -> Result<(), lmrc_k3s::K3sError> {
561    /// # let manager = K3sManager::builder().token("token").build();
562    /// manager.remove_worker("192.168.1.10", "192.168.1.11").await?;
563    /// # Ok(())
564    /// # }
565    /// ```
566    pub async fn remove_worker(&self, master_ip: &str, worker_ip: &str) -> Result<()> {
567        info!("Removing worker node: {}", worker_ip);
568
569        // Get the node name from kubectl
570        let nodes = self.get_node_info_list(master_ip).await?;
571        let node_name = nodes
572            .iter()
573            .find(|n| {
574                n.internal_ip.as_ref() == Some(&worker_ip.to_string())
575                    || n.external_ip.as_ref() == Some(&worker_ip.to_string())
576                    || n.name.contains(worker_ip)
577            })
578            .map(|n| n.name.clone());
579
580        if let Some(name) = node_name {
581            // Drain the node
582            info!("Draining node: {}", name);
583            let drain_cmd = format!(
584                "kubectl drain {} --ignore-daemonsets --delete-emptydir-data --force --timeout=60s",
585                name
586            );
587            let _ = self.execute_ssh(master_ip, &drain_cmd).await;
588
589            // Delete from cluster
590            info!("Deleting node from cluster: {}", name);
591            let delete_cmd = format!("kubectl delete node {}", name);
592            let _ = self.execute_ssh(master_ip, &delete_cmd).await;
593        }
594
595        // Uninstall K3s from the worker
596        info!("Uninstalling K3s from worker: {}", worker_ip);
597        self.uninstall(worker_ip, false).await?;
598
599        // Update state
600        {
601            let mut state = self.state.write().unwrap();
602            if let Some(ref mut cluster_state) = *state {
603                cluster_state.workers.retain(|w| w.ip != worker_ip);
604                cluster_state.update_cluster_status();
605            }
606        }
607
608        info!("Worker node removed successfully");
609        Ok(())
610    }
611}
612
613/// Builder for K3sManager
614#[derive(Default)]
615pub struct K3sManagerBuilder {
616    k3s_version: Option<String>,
617    token: Option<String>,
618    disable_components: Vec<String>,
619    ssh_username: Option<String>,
620    ssh_key_path: Option<String>,
621    ssh_password: Option<String>,
622}
623
624impl K3sManagerBuilder {
625    /// Set K3s version
626    pub fn version(mut self, version: impl Into<String>) -> Self {
627        self.k3s_version = Some(version.into());
628        self
629    }
630
631    /// Set cluster token (required)
632    pub fn token(mut self, token: impl Into<String>) -> Self {
633        self.token = Some(token.into());
634        self
635    }
636
637    /// Set components to disable
638    pub fn disable(mut self, components: Vec<String>) -> Self {
639        self.disable_components = components;
640        self
641    }
642
643    /// Set SSH username (default: "root")
644    pub fn ssh_username(mut self, username: impl Into<String>) -> Self {
645        self.ssh_username = Some(username.into());
646        self
647    }
648
649    /// Set SSH private key path (default: ~/.ssh/id_rsa)
650    pub fn ssh_key_path(mut self, path: impl Into<String>) -> Self {
651        self.ssh_key_path = Some(path.into());
652        self
653    }
654
655    /// Set SSH password for password-based authentication
656    /// If not set, public key authentication will be used
657    pub fn ssh_password(mut self, password: impl Into<String>) -> Self {
658        self.ssh_password = Some(password.into());
659        self
660    }
661
662    /// Build the K3sManager instance
663    pub fn build(self) -> K3sManager {
664        K3sManager {
665            k3s_version: self
666                .k3s_version
667                .unwrap_or_else(|| "v1.28.5+k3s1".to_string()),
668            token: self.token.expect("Token must be set"),
669            disable_components: self.disable_components,
670            ssh_username: self.ssh_username.unwrap_or_else(|| "root".to_string()),
671            ssh_key_path: self.ssh_key_path,
672            ssh_password: self.ssh_password,
673            state: Arc::new(RwLock::new(None)),
674        }
675    }
676}