testcontainers_modules/k3s/
mod.rs1use std::{
2 borrow::Cow,
3 collections::HashMap,
4 io,
5 io::ErrorKind,
6 path::{Path, PathBuf},
7};
8
9use testcontainers::{
10 core::{ContainerPort, Mount, WaitFor},
11 Image,
12};
13
14const NAME: &str = "rancher/k3s";
15const TAG: &str = "v1.28.8-k3s1";
16pub const TRAEFIK_HTTP: ContainerPort = ContainerPort::Tcp(80);
21pub const KUBE_SECURE_PORT: ContainerPort = ContainerPort::Tcp(6443);
26pub const RANCHER_WEBHOOK_PORT: ContainerPort = ContainerPort::Tcp(8443);
31
32#[derive(Debug, Default, Clone)]
67pub struct K3s {
68 env_vars: HashMap<String, String>,
69 conf_mount: Option<Mount>,
70 cmd: K3sCmd,
71}
72
73#[allow(missing_docs)]
74#[derive(Debug, Clone)]
76pub struct K3sCmd {
77 snapshotter: String,
78}
79
80impl K3sCmd {
81 #[allow(missing_docs)]
83 pub fn with_snapshotter(self, snapshotter: impl Into<String>) -> Self {
84 Self {
85 snapshotter: snapshotter.into(),
86 }
87 }
88}
89
90impl Default for K3sCmd {
91 fn default() -> Self {
92 Self {
93 snapshotter: String::from("native"),
94 }
95 }
96}
97
98impl Image for K3s {
99 fn name(&self) -> &str {
100 NAME
101 }
102
103 fn tag(&self) -> &str {
104 TAG
105 }
106
107 fn ready_conditions(&self) -> Vec<WaitFor> {
108 vec![WaitFor::message_on_stderr(
109 "Node controller sync successful",
110 )]
111 }
112
113 fn env_vars(
114 &self,
115 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
116 &self.env_vars
117 }
118
119 fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
120 let mut mounts = Vec::new();
121 if let Some(conf_mount) = &self.conf_mount {
122 mounts.push(conf_mount);
123 }
124 mounts
125 }
126
127 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
128 &self.cmd
129 }
130
131 fn expose_ports(&self) -> &[ContainerPort] {
132 &[KUBE_SECURE_PORT, RANCHER_WEBHOOK_PORT, TRAEFIK_HTTP]
133 }
134}
135
136impl K3s {
137 #[allow(missing_docs)]
139 pub fn with_conf_mount(mut self, conf_mount_path: impl AsRef<Path>) -> Self {
140 self.env_vars
141 .insert(String::from("K3S_KUBECONFIG_MODE"), String::from("644"));
142 Self {
143 conf_mount: Some(Mount::bind_mount(
144 conf_mount_path.as_ref().to_str().unwrap_or_default(),
145 "/etc/rancher/k3s/",
146 )),
147 ..self
148 }
149 }
150
151 #[allow(missing_docs)]
153 pub fn read_kube_config(&self) -> io::Result<String> {
154 let k3s_conf_file_path = self
155 .conf_mount
156 .as_ref()
157 .and_then(|mount| mount.source())
158 .map(PathBuf::from)
159 .map(|conf_dir| conf_dir.join("k3s.yaml"))
160 .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "K3s conf dir is not mounted"))?;
161
162 std::fs::read_to_string(k3s_conf_file_path)
163 }
164}
165
166impl IntoIterator for &K3sCmd {
167 type Item = String;
168 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
169
170 fn into_iter(self) -> Self::IntoIter {
171 let mut cmd = vec![String::from("server")];
172 cmd.push(format!("--snapshotter={}", self.snapshotter));
173 cmd.into_iter()
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use std::env::temp_dir;
180
181 use k8s_openapi::api::core::v1::Pod;
182 use kube::{
183 api::ListParams,
184 config::{KubeConfigOptions, Kubeconfig},
185 Api, Config, ResourceExt,
186 };
187 use rustls::crypto::CryptoProvider;
188 use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt};
189
190 use super::*;
191
192 #[tokio::test]
193 async fn k3s_pods() -> Result<(), Box<dyn std::error::Error + 'static>> {
194 let conf_dir = temp_dir();
195 let k3s = K3s::default()
196 .with_conf_mount(&conf_dir)
197 .with_privileged(true)
198 .with_userns_mode("host");
199
200 let k3s_container = k3s.start().await?;
201
202 let client = get_kube_client(&k3s_container).await?;
203
204 let pods = Api::<Pod>::all(client)
205 .list(&ListParams::default())
206 .await
207 .expect("Cannot read pods");
208
209 let pod_names = pods
210 .into_iter()
211 .map(|pod| pod.name_any())
212 .collect::<Vec<_>>();
213
214 assert!(
215 pod_names
216 .iter()
217 .any(|pod_name| pod_name.starts_with("coredns")),
218 "coredns pod not found - found pods {pod_names:?}"
219 );
220 assert!(
221 pod_names
222 .iter()
223 .any(|pod_name| pod_name.starts_with("metrics-server")),
224 "metrics-server pod not found - found pods {pod_names:?}"
225 );
226 assert!(
227 pod_names
228 .iter()
229 .any(|pod_name| pod_name.starts_with("local-path-provisioner")),
230 "local-path-provisioner pod not found - found pods {pod_names:?}"
231 );
232 Ok(())
233 }
234
235 pub async fn get_kube_client(
236 container: &ContainerAsync<K3s>,
237 ) -> Result<kube::Client, Box<dyn std::error::Error + 'static>> {
238 if CryptoProvider::get_default().is_none() {
239 rustls::crypto::ring::default_provider()
240 .install_default()
241 .expect("Error initializing rustls provider");
242 }
243
244 let conf_yaml = container.image().read_kube_config()?;
245
246 let mut config = Kubeconfig::from_yaml(&conf_yaml).expect("Error loading kube config");
247
248 let port = container.get_host_port_ipv4(KUBE_SECURE_PORT).await?;
249 config.clusters.iter_mut().for_each(|cluster| {
250 if let Some(server) = cluster.cluster.as_mut().and_then(|c| c.server.as_mut()) {
251 *server = format!("https://127.0.0.1:{}", port)
252 }
253 });
254
255 let client_config =
256 Config::from_custom_kubeconfig(config, &KubeConfigOptions::default()).await?;
257
258 Ok(kube::Client::try_from(client_config)?)
259 }
260}