testcontainers_modules/k3s/
mod.rs1use std::{
2 borrow::Cow,
3 collections::HashMap,
4 io,
5 io::ErrorKind,
6 path::{Path, PathBuf},
7};
8
9use testcontainers::{
10 core::{ContainerPort, Mount, WaitFor},
11 Image,
12};
13
14const NAME: &str = "rancher/k3s";
15const TAG: &str = "v1.28.8-k3s1";
16pub const TRAEFIK_HTTP: ContainerPort = ContainerPort::Tcp(80);
21pub const KUBE_SECURE_PORT: ContainerPort = ContainerPort::Tcp(6443);
26pub const RANCHER_WEBHOOK_PORT: ContainerPort = ContainerPort::Tcp(8443);
31
32#[derive(Debug, Default, Clone)]
67pub struct K3s {
68 env_vars: HashMap<String, String>,
69 conf_mount: Option<Mount>,
70 cmd: K3sCmd,
71}
72
73#[derive(Debug, Clone)]
78pub struct K3sCmd {
79 snapshotter: String,
80}
81
82impl K3sCmd {
83 pub fn with_snapshotter(self, snapshotter: impl Into<String>) -> Self {
95 Self {
96 snapshotter: snapshotter.into(),
97 }
98 }
99}
100
101impl Default for K3sCmd {
102 fn default() -> Self {
103 Self {
104 snapshotter: String::from("native"),
105 }
106 }
107}
108
109impl Image for K3s {
110 fn name(&self) -> &str {
111 NAME
112 }
113
114 fn tag(&self) -> &str {
115 TAG
116 }
117
118 fn ready_conditions(&self) -> Vec<WaitFor> {
119 vec![WaitFor::message_on_stderr(
120 "Node controller sync successful",
121 )]
122 }
123
124 fn env_vars(
125 &self,
126 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
127 &self.env_vars
128 }
129
130 fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
131 let mut mounts = Vec::new();
132 if let Some(conf_mount) = &self.conf_mount {
133 mounts.push(conf_mount);
134 }
135 mounts
136 }
137
138 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
139 &self.cmd
140 }
141
142 fn expose_ports(&self) -> &[ContainerPort] {
143 &[KUBE_SECURE_PORT, RANCHER_WEBHOOK_PORT, TRAEFIK_HTTP]
144 }
145}
146
147impl K3s {
148 pub fn with_conf_mount(mut self, conf_mount_path: impl AsRef<Path>) -> Self {
163 self.env_vars
164 .insert(String::from("K3S_KUBECONFIG_MODE"), String::from("644"));
165 Self {
166 conf_mount: Some(Mount::bind_mount(
167 conf_mount_path.as_ref().to_str().unwrap_or_default(),
168 "/etc/rancher/k3s/",
169 )),
170 ..self
171 }
172 }
173
174 pub fn read_kube_config(&self) -> io::Result<String> {
191 let k3s_conf_file_path = self
192 .conf_mount
193 .as_ref()
194 .and_then(|mount| mount.source())
195 .map(PathBuf::from)
196 .map(|conf_dir| conf_dir.join("k3s.yaml"))
197 .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "K3s conf dir is not mounted"))?;
198
199 std::fs::read_to_string(k3s_conf_file_path)
200 }
201}
202
203impl IntoIterator for &K3sCmd {
204 type Item = String;
205 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
206
207 fn into_iter(self) -> Self::IntoIter {
208 let mut cmd = vec![String::from("server")];
209 cmd.push(format!("--snapshotter={}", self.snapshotter));
210 cmd.into_iter()
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use std::env::temp_dir;
217
218 use k8s_openapi::api::core::v1::Pod;
219 use kube::{
220 api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams, ResourceExt},
221 config::{KubeConfigOptions, Kubeconfig},
222 runtime::wait::{await_condition, conditions::is_pod_running},
223 Config,
224 };
225 use rustls::crypto::CryptoProvider;
226 use serde_json::json;
227 use serial_test::serial;
228 use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt};
229
230 use super::*;
231
232 #[serial]
233 #[tokio::test]
234 async fn k3s_pods() -> Result<(), Box<dyn std::error::Error + 'static>> {
235 let conf_dir = temp_dir();
236 let k3s = K3s::default()
237 .with_conf_mount(&conf_dir)
238 .with_privileged(true)
239 .with_userns_mode("host");
240
241 let k3s_container = k3s.start().await?;
242
243 let client = get_kube_client(&k3s_container).await?;
244
245 let pods = Api::<Pod>::all(client)
246 .list(&ListParams::default())
247 .await
248 .expect("Cannot read pods");
249
250 let pod_names = pods
251 .into_iter()
252 .map(|pod| pod.name_any())
253 .collect::<Vec<_>>();
254
255 assert!(
256 pod_names
257 .iter()
258 .any(|pod_name| pod_name.starts_with("coredns")),
259 "coredns pod not found - found pods {pod_names:?}"
260 );
261 assert!(
262 pod_names
263 .iter()
264 .any(|pod_name| pod_name.starts_with("metrics-server")),
265 "metrics-server pod not found - found pods {pod_names:?}"
266 );
267 assert!(
268 pod_names
269 .iter()
270 .any(|pod_name| pod_name.starts_with("local-path-provisioner")),
271 "local-path-provisioner pod not found - found pods {pod_names:?}"
272 );
273 Ok(())
274 }
275
276 #[serial]
278 #[tokio::test]
279 async fn pod_api() -> Result<(), Box<dyn std::error::Error + 'static>> {
280 let conf_dir = temp_dir();
281 let k3s = K3s::default()
282 .with_conf_mount(&conf_dir)
283 .with_privileged(true)
284 .with_userns_mode("host");
285
286 let k3s_container = k3s.start().await?;
287
288 let client = get_kube_client(&k3s_container).await?;
289
290 let pods: Api<Pod> = Api::default_namespaced(client);
292
293 let p: Pod = serde_json::from_value(json!({
295 "apiVersion": "v1",
296 "kind": "Pod",
297 "metadata": { "name": "busybox" },
298 "spec": {
299 "containers": [{
300 "name": "busybox",
301 "image": "busybox:1.36.1-musl"
302 }],
303 }
304 }))?;
305
306 let post_params = PostParams::default();
307 match pods.create(&post_params, &p).await {
308 Ok(o) => {
309 let name = o.name_any();
310 assert_eq!(p.name_any(), name);
311 }
312 Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
315
316 let establish = await_condition(pods.clone(), "busybox", is_pod_running());
318 let _ = tokio::time::timeout(std::time::Duration::from_secs(15), establish).await?;
319
320 let p1cpy = pods.get("busybox").await?;
322 if let Some(spec) = &p1cpy.spec {
323 assert_eq!(spec.containers[0].name, "busybox");
324 }
325
326 let patch = json!({
328 "metadata": {
329 "resourceVersion": p1cpy.resource_version(),
330 },
331 "spec": {
332 "activeDeadlineSeconds": 5
333 }
334 });
335
336 let patch_params = PatchParams::default();
337 let p_patched = pods
338 .patch("busybox", &patch_params, &Patch::Merge(&patch))
339 .await?;
340 assert_eq!(p_patched.spec.unwrap().active_deadline_seconds, Some(5));
341
342 let lp = ListParams::default().fields(&format!("metadata.name={}", "busybox")); for p in pods.list(&lp).await? {
344 println!("Found Pod: {}", p.name_any());
345 }
346
347 let delete_params = DeleteParams::default();
349 pods.delete("busybox", &delete_params)
350 .await?
351 .map_left(|pdel| {
352 assert_eq!(pdel.name_any(), "busybox");
353 });
354
355 Ok(())
356 }
357
358 pub async fn get_kube_client(
359 container: &ContainerAsync<K3s>,
360 ) -> Result<kube::Client, Box<dyn std::error::Error + 'static>> {
361 if CryptoProvider::get_default().is_none() {
362 rustls::crypto::ring::default_provider()
363 .install_default()
364 .expect("Error initializing rustls provider");
365 }
366
367 let conf_yaml = container.image().read_kube_config()?;
368
369 let mut config = Kubeconfig::from_yaml(&conf_yaml).expect("Error loading kube config");
370
371 let port = container.get_host_port_ipv4(KUBE_SECURE_PORT).await?;
372 config.clusters.iter_mut().for_each(|cluster| {
373 if let Some(server) = cluster.cluster.as_mut().and_then(|c| c.server.as_mut()) {
374 *server = format!("https://127.0.0.1:{port}")
375 }
376 });
377
378 let client_config =
379 Config::from_custom_kubeconfig(config, &KubeConfigOptions::default()).await?;
380
381 Ok(kube::Client::try_from(client_config)?)
382 }
383}