1pub mod adapter;
6mod error;
7mod state;
8
9pub use adapter::K3sAdapter;
10pub use error::{K3sError, Result};
11pub use state::{
12 ClusterState, ClusterStatus, DesiredClusterConfig, NodeInfo, NodeStatus, ReconciliationPlan,
13 WorkerConfig, WorkerNode,
14};
15
16use lmrc_ssh::{AuthMethod, SshClient};
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20use tracing::{debug, info};
21
22pub struct K3sManager {
24 k3s_version: String,
25 token: String,
26 disable_components: Vec<String>,
27 ssh_username: String,
28 ssh_key_path: Option<String>,
29 ssh_password: Option<String>,
30 state: Arc<RwLock<Option<ClusterState>>>,
31}
32
33impl K3sManager {
34 pub fn new(k3s_version: String, token: String, disable_components: Vec<String>) -> Self {
36 Self {
37 k3s_version,
38 token,
39 disable_components,
40 ssh_username: "root".to_string(),
41 ssh_key_path: None,
42 ssh_password: None,
43 state: Arc::new(RwLock::new(None)),
44 }
45 }
46
47 pub fn builder() -> K3sManagerBuilder {
49 K3sManagerBuilder::default()
50 }
51
52 pub async fn execute_ssh(&self, host: &str, command: &str) -> Result<String> {
54 debug!("Executing on {}: {}", host, command);
55
56 let auth = if let Some(ref password) = self.ssh_password {
58 AuthMethod::Password {
59 username: self.ssh_username.clone(),
60 password: password.clone(),
61 }
62 } else {
63 let key_path = self.ssh_key_path.clone().unwrap_or_else(|| {
65 std::env::var("SSH_KEY_PATH")
66 .unwrap_or_else(|_| ".ssh/id_rsa".to_string())
67 });
68
69 AuthMethod::PublicKey {
70 username: self.ssh_username.clone(),
71 private_key_path: key_path,
72 passphrase: None,
73 }
74 };
75
76 let mut client = SshClient::new(host, 22)
78 .map_err(|e| K3sError::ConnectionError {
79 host: host.to_string(),
80 message: format!("Failed to create SSH client: {}", e),
81 })?
82 .with_auth(auth)
83 .connect()
84 .map_err(|e| K3sError::ConnectionError {
85 host: host.to_string(),
86 message: format!("Failed to connect: {}", e),
87 })?;
88
89 let output = client
91 .execute(command)
92 .map_err(|e| K3sError::CommandError {
93 host: host.to_string(),
94 command: command.to_string(),
95 exit_code: None,
96 message: format!("Execution failed: {}", e),
97 })?;
98
99 if output.exit_status != 0 {
101 return Err(K3sError::CommandError {
102 host: host.to_string(),
103 command: command.to_string(),
104 exit_code: Some(output.exit_status),
105 message: format!("stderr: {}", output.stderr),
106 });
107 }
108
109 Ok(output.stdout)
110 }
111
112 pub async fn is_installed(&self, server_ip: &str) -> Result<bool> {
114 match self.execute_ssh(server_ip, "which k3s").await {
115 Ok(output) => Ok(!output.trim().is_empty()),
116 Err(_) => Ok(false),
117 }
118 }
119
120 pub async fn install_master(
122 &self,
123 server_ip: &str,
124 private_ip: Option<&str>,
125 force: bool,
126 ) -> Result<()> {
127 info!("Installing K3s on master node {}", server_ip);
128
129 if !force && self.is_installed(server_ip).await? {
130 return Err(K3sError::AlreadyInstalledError {
131 node: server_ip.to_string(),
132 });
133 }
134
135 {
136 let mut state = self.state.write().unwrap();
137 *state = Some(ClusterState::new(
138 server_ip.to_string(),
139 private_ip.map(String::from),
140 self.k3s_version.clone(),
141 self.token.clone(),
142 self.disable_components.clone(),
143 ));
144 }
145
146 let mut install_cmd = if self.k3s_version == "latest" || self.k3s_version.is_empty() {
147 format!(
148 "curl -sfL https://get.k3s.io | K3S_TOKEN={} sh -s - server --cluster-init",
149 self.token
150 )
151 } else {
152 format!(
153 "curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION={} K3S_TOKEN={} sh -s - server --cluster-init",
154 self.k3s_version, self.token
155 )
156 };
157
158 if !self.disable_components.is_empty() {
159 let disable_args = self
160 .disable_components
161 .iter()
162 .map(|c| format!("--disable {}", c))
163 .collect::<Vec<_>>()
164 .join(" ");
165 install_cmd.push_str(&format!(" {}", disable_args));
166 }
167
168 if let Some(priv_ip) = private_ip {
169 install_cmd.push_str(&format!(" --node-ip={} --tls-san={}", priv_ip, priv_ip));
170 }
171
172 self.execute_ssh(server_ip, &install_cmd).await?;
173
174 self.wait_for_k3s_ready(server_ip, 30, Duration::from_secs(10))
178 .await?;
179
180 info!("K3s master installation complete");
181 Ok(())
182 }
183
184 pub async fn join_worker(
186 &self,
187 worker_ip: &str,
188 master_ip: &str,
189 worker_private_ip: Option<&str>,
190 ) -> Result<()> {
191 info!("Joining worker {} to cluster", worker_ip);
192
193 let master_cluster_ip = {
196 let state = self.state.read().unwrap();
197 state.as_ref()
198 .and_then(|s| s.master_private_ip.clone())
199 .unwrap_or_else(|| master_ip.to_string())
200 };
201
202 info!("Using master IP {} for cluster communication", master_cluster_ip);
203
204 debug!("Running pre-flight connectivity checks...");
206
207 debug!("Testing K3s API connectivity from worker to master...");
209 let connectivity_test = format!(
210 "timeout 10 nc -zv {} 6443 2>&1 || echo 'CONNECTION_FAILED'",
211 master_cluster_ip
212 );
213 let connectivity_result = self.execute_ssh(worker_ip, &connectivity_test).await?;
214
215 if connectivity_result.contains("CONNECTION_FAILED") || connectivity_result.contains("timed out") {
216 return Err(K3sError::ConnectionError {
217 host: worker_ip.to_string(),
218 message: format!(
219 "Worker cannot reach master on port 6443. This usually indicates a firewall issue. \
220 Connectivity test output: {}",
221 connectivity_result
222 ),
223 });
224 }
225 debug!("K3s API connectivity check passed");
226
227 debug!("Verifying master K3s service is running...");
229 let service_check = "systemctl is-active k3s 2>&1";
230 let service_result = self.execute_ssh(master_ip, service_check).await?;
231
232 if !service_result.trim().contains("active") {
233 return Err(K3sError::ClusterNotReadyError {
234 attempts: 1,
235 message: format!(
236 "Master K3s service is not running. Service status: {}",
237 service_result
238 ),
239 });
240 }
241 debug!("Master K3s service is running");
242
243 {
244 let mut state = self.state.write().unwrap();
245 if let Some(ref mut cluster_state) = *state {
246 cluster_state
247 .add_worker(worker_ip.to_string(), worker_private_ip.map(String::from));
248 }
249 }
250
251 let mut install_cmd = if self.k3s_version == "latest" || self.k3s_version.is_empty() {
252 format!(
253 "curl -sfL https://get.k3s.io | K3S_URL=https://{}:6443 K3S_TOKEN={} sh -s -",
254 master_cluster_ip, self.token
255 )
256 } else {
257 format!(
258 "curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION={} K3S_URL=https://{}:6443 K3S_TOKEN={} sh -s -",
259 self.k3s_version, master_cluster_ip, self.token
260 )
261 };
262
263 if let Some(priv_ip) = worker_private_ip {
264 install_cmd.push_str(&format!(" --node-ip={}", priv_ip));
265 }
266
267 info!("Running K3s agent installation on worker (this may take 2-5 minutes)...");
268 self.execute_ssh(worker_ip, &install_cmd).await?;
269
270 info!("K3s agent installation completed, waiting for node to be ready...");
271 tokio::time::sleep(Duration::from_secs(5)).await;
272
273 {
274 let mut state = self.state.write().unwrap();
275 if let Some(ref mut cluster_state) = *state {
276 cluster_state.update_worker_status(worker_ip, NodeStatus::Ready);
277 cluster_state.update_cluster_status();
278 }
279 }
280
281 info!("Worker joined successfully");
282 Ok(())
283 }
284
285 pub async fn download_kubeconfig<P: AsRef<Path>>(
287 &self,
288 master_ip: &str,
289 output_path: P,
290 ) -> Result<()> {
291 info!("Downloading kubeconfig");
292
293 let content = self
294 .execute_ssh(master_ip, "cat /etc/rancher/k3s/k3s.yaml")
295 .await?;
296 let kubeconfig = content.replace("127.0.0.1", master_ip);
297
298 std::fs::write(output_path.as_ref(), kubeconfig).map_err(|e| K3sError::IoError {
299 operation: "write kubeconfig".to_string(),
300 source: e,
301 })?;
302
303 Ok(())
304 }
305
306 pub async fn get_nodes(&self, master_ip: &str) -> Result<String> {
308 self.execute_ssh(master_ip, "kubectl get nodes").await
309 }
310
311 async fn wait_for_k3s_ready(
313 &self,
314 server_ip: &str,
315 max_attempts: u32,
316 wait_duration: Duration,
317 ) -> Result<()> {
318 for attempt in 1..=max_attempts {
319 match self.execute_ssh(server_ip, "kubectl get nodes").await {
320 Ok(_) => return Ok(()),
321 Err(e) => {
322 if attempt == max_attempts {
323 return Err(K3sError::ClusterNotReadyError {
324 attempts: max_attempts,
325 message: format!("{}", e),
326 });
327 }
328 tokio::time::sleep(wait_duration).await;
329 }
330 }
331 }
332 Ok(())
333 }
334
335 pub async fn uninstall(&self, server_ip: &str, is_master: bool) -> Result<()> {
337 let script = if is_master {
338 "/usr/local/bin/k3s-uninstall.sh"
339 } else {
340 "/usr/local/bin/k3s-agent-uninstall.sh"
341 };
342
343 let _ = self.execute_ssh(server_ip, script).await;
344 Ok(())
345 }
346
347 pub fn get_cluster_state(&self) -> Option<ClusterState> {
349 self.state.read().unwrap().clone()
350 }
351
352 pub async fn refresh_cluster_state(&self, master_ip: &str) -> Result<()> {
354 let _ = self.get_nodes(master_ip).await?;
355
356 let mut state = self.state.write().unwrap();
357 if let Some(ref mut cluster_state) = *state {
358 cluster_state.update_cluster_status();
359 }
360 Ok(())
361 }
362
363 pub async fn get_node_info_list(&self, master_ip: &str) -> Result<Vec<NodeInfo>> {
365 let output = self.get_nodes(master_ip).await?;
366
367 let nodes: Vec<NodeInfo> = output
368 .lines()
369 .skip(1) .filter_map(NodeInfo::parse_from_kubectl_line)
371 .collect();
372
373 Ok(nodes)
374 }
375
376 pub async fn plan_reconciliation(
404 &self,
405 desired: &DesiredClusterConfig,
406 ) -> Result<ReconciliationPlan> {
407 let master_installed = self.is_installed(&desired.master_ip).await?;
409
410 let current_state = self.get_cluster_state();
412 let current_workers: Vec<String> = current_state
413 .as_ref()
414 .map(|s| s.workers.iter().map(|w| w.ip.clone()).collect())
415 .unwrap_or_default();
416
417 let desired_workers: Vec<String> = desired.workers.iter().map(|w| w.ip.clone()).collect();
418
419 let workers_to_add: Vec<WorkerConfig> = desired
421 .workers
422 .iter()
423 .filter(|w| !current_workers.contains(&w.ip))
424 .cloned()
425 .collect();
426
427 let workers_to_remove: Vec<String> = current_workers
429 .into_iter()
430 .filter(|ip| !desired_workers.contains(ip))
431 .collect();
432
433 Ok(ReconciliationPlan {
434 workers_to_add,
435 workers_to_remove,
436 install_master: !master_installed,
437 })
438 }
439
440 pub async fn apply_reconciliation(
468 &self,
469 desired: &DesiredClusterConfig,
470 plan: &ReconciliationPlan,
471 ) -> Result<()> {
472 if plan.install_master {
474 info!("Installing master as part of reconciliation");
475 self.install_master(
476 &desired.master_ip,
477 desired.master_private_ip.as_deref(),
478 false,
479 )
480 .await?;
481 }
482
483 for worker_ip in &plan.workers_to_remove {
485 info!("Removing worker node: {}", worker_ip);
486 self.remove_worker(&desired.master_ip, worker_ip).await?;
487 }
488
489 for worker in &plan.workers_to_add {
491 info!("Adding worker node: {}", worker.ip);
492 self.join_worker(&worker.ip, &desired.master_ip, worker.private_ip.as_deref())
493 .await?;
494 }
495
496 Ok(())
497 }
498
499 pub async fn reconcile(&self, desired: &DesiredClusterConfig) -> Result<()> {
533 let plan = self.plan_reconciliation(desired).await?;
534
535 if !plan.has_changes() {
536 info!("Cluster is already in desired state");
537 return Ok(());
538 }
539
540 info!("Reconciliation plan: {}", plan.summary());
541 self.apply_reconciliation(desired, &plan).await?;
542
543 info!("Reconciliation complete");
544 Ok(())
545 }
546
547 pub async fn remove_worker(&self, master_ip: &str, worker_ip: &str) -> Result<()> {
567 info!("Removing worker node: {}", worker_ip);
568
569 let nodes = self.get_node_info_list(master_ip).await?;
571 let node_name = nodes
572 .iter()
573 .find(|n| {
574 n.internal_ip.as_ref() == Some(&worker_ip.to_string())
575 || n.external_ip.as_ref() == Some(&worker_ip.to_string())
576 || n.name.contains(worker_ip)
577 })
578 .map(|n| n.name.clone());
579
580 if let Some(name) = node_name {
581 info!("Draining node: {}", name);
583 let drain_cmd = format!(
584 "kubectl drain {} --ignore-daemonsets --delete-emptydir-data --force --timeout=60s",
585 name
586 );
587 let _ = self.execute_ssh(master_ip, &drain_cmd).await;
588
589 info!("Deleting node from cluster: {}", name);
591 let delete_cmd = format!("kubectl delete node {}", name);
592 let _ = self.execute_ssh(master_ip, &delete_cmd).await;
593 }
594
595 info!("Uninstalling K3s from worker: {}", worker_ip);
597 self.uninstall(worker_ip, false).await?;
598
599 {
601 let mut state = self.state.write().unwrap();
602 if let Some(ref mut cluster_state) = *state {
603 cluster_state.workers.retain(|w| w.ip != worker_ip);
604 cluster_state.update_cluster_status();
605 }
606 }
607
608 info!("Worker node removed successfully");
609 Ok(())
610 }
611}
612
613#[derive(Default)]
615pub struct K3sManagerBuilder {
616 k3s_version: Option<String>,
617 token: Option<String>,
618 disable_components: Vec<String>,
619 ssh_username: Option<String>,
620 ssh_key_path: Option<String>,
621 ssh_password: Option<String>,
622}
623
624impl K3sManagerBuilder {
625 pub fn version(mut self, version: impl Into<String>) -> Self {
627 self.k3s_version = Some(version.into());
628 self
629 }
630
631 pub fn token(mut self, token: impl Into<String>) -> Self {
633 self.token = Some(token.into());
634 self
635 }
636
637 pub fn disable(mut self, components: Vec<String>) -> Self {
639 self.disable_components = components;
640 self
641 }
642
643 pub fn ssh_username(mut self, username: impl Into<String>) -> Self {
645 self.ssh_username = Some(username.into());
646 self
647 }
648
649 pub fn ssh_key_path(mut self, path: impl Into<String>) -> Self {
651 self.ssh_key_path = Some(path.into());
652 self
653 }
654
655 pub fn ssh_password(mut self, password: impl Into<String>) -> Self {
658 self.ssh_password = Some(password.into());
659 self
660 }
661
662 pub fn build(self) -> K3sManager {
664 K3sManager {
665 k3s_version: self
666 .k3s_version
667 .unwrap_or_else(|| "v1.28.5+k3s1".to_string()),
668 token: self.token.expect("Token must be set"),
669 disable_components: self.disable_components,
670 ssh_username: self.ssh_username.unwrap_or_else(|| "root".to_string()),
671 ssh_key_path: self.ssh_key_path,
672 ssh_password: self.ssh_password,
673 state: Arc::new(RwLock::new(None)),
674 }
675 }
676}