testcontainers_modules/k3s/
mod.rs

1use std::{
2    borrow::Cow,
3    collections::HashMap,
4    io,
5    io::ErrorKind,
6    path::{Path, PathBuf},
7};
8
9use testcontainers::{
10    core::{ContainerPort, Mount, WaitFor},
11    Image,
12};
13
14const NAME: &str = "rancher/k3s";
15const TAG: &str = "v1.28.8-k3s1";
16/// Port that the [`traefik`] part of the container has internally
17/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
18///
19/// [`traefik`]: https://doc.traefik.io/traefik/
20pub const TRAEFIK_HTTP: ContainerPort = ContainerPort::Tcp(80);
21/// Port that the [`Kubernetes`] part of the container has internally
22/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
23///
24/// [`Kubernetes`]: https://kubernetes.io/
25pub const KUBE_SECURE_PORT: ContainerPort = ContainerPort::Tcp(6443);
26/// Port that the [`Rancher`] part of the container has internally
27/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
28///
29/// [`Rancher`]: https://rancher.io/
30pub const RANCHER_WEBHOOK_PORT: ContainerPort = ContainerPort::Tcp(8443);
31
32/// Module to work with [`K3s`] inside of tests.
33///
34/// Starts an instance of K3s, a single-node server fully-functional Kubernetes cluster
35/// so you are able to interact with the cluster using standard [`Kubernetes API`] exposed at [`KUBE_SECURE_PORT`] port
36///
37/// This module is based on the official [`K3s docker image`].
38///
39/// # Example
40/// ```
41/// use std::env::temp_dir;
42///
43/// use testcontainers_modules::{
44///     k3s::{K3s, KUBE_SECURE_PORT},
45///     testcontainers::{runners::SyncRunner, ImageExt},
46/// };
47///
48/// let k3s_instance = K3s::default()
49///     .with_conf_mount(&temp_dir())
50///     .with_privileged(true)
51///     .with_userns_mode("host")
52///     .start()
53///     .unwrap();
54///
55/// let kube_port = k3s_instance.get_host_port_ipv4(KUBE_SECURE_PORT);
56/// let kube_conf = k3s_instance
57///     .image()
58///     .read_kube_config()
59///     .expect("Cannot read kube conf");
60/// // use kube_port and kube_conf to connect and control k3s cluster
61/// ```
62///
63/// [`K3s`]: https://k3s.io/
64/// [`Kubernetes API`]: https://kubernetes.io/docs/concepts/overview/kubernetes-api/
65/// [`K3s docker image`]: https://hub.docker.com/r/rancher/k3s
66#[derive(Debug, Default, Clone)]
67pub struct K3s {
68    env_vars: HashMap<String, String>,
69    conf_mount: Option<Mount>,
70    cmd: K3sCmd,
71}
72
73/// Configuration for K3s server command-line arguments.
74///
75/// This struct allows you to customize the K3s server startup configuration
76/// by setting various options like the container snapshotter.
77#[derive(Debug, Clone)]
78pub struct K3sCmd {
79    snapshotter: String,
80}
81
82impl K3sCmd {
83    /// Sets the container snapshotter for the K3s server.
84    ///
85    /// The snapshotter is responsible for managing container filesystem snapshots.
86    /// Common values include "overlayfs", "fuse-overlayfs", or "native".
87    ///
88    /// # Example
89    /// ```
90    /// use testcontainers_modules::k3s::K3sCmd;
91    ///
92    /// let cmd = K3sCmd::default().with_snapshotter("overlayfs");
93    /// ```
94    pub fn with_snapshotter(self, snapshotter: impl Into<String>) -> Self {
95        Self {
96            snapshotter: snapshotter.into(),
97        }
98    }
99}
100
101impl Default for K3sCmd {
102    fn default() -> Self {
103        Self {
104            snapshotter: String::from("native"),
105        }
106    }
107}
108
109impl Image for K3s {
110    fn name(&self) -> &str {
111        NAME
112    }
113
114    fn tag(&self) -> &str {
115        TAG
116    }
117
118    fn ready_conditions(&self) -> Vec<WaitFor> {
119        vec![WaitFor::message_on_stderr(
120            "Node controller sync successful",
121        )]
122    }
123
124    fn env_vars(
125        &self,
126    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
127        &self.env_vars
128    }
129
130    fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
131        let mut mounts = Vec::new();
132        if let Some(conf_mount) = &self.conf_mount {
133            mounts.push(conf_mount);
134        }
135        mounts
136    }
137
138    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
139        &self.cmd
140    }
141
142    fn expose_ports(&self) -> &[ContainerPort] {
143        &[KUBE_SECURE_PORT, RANCHER_WEBHOOK_PORT, TRAEFIK_HTTP]
144    }
145}
146
147impl K3s {
148    /// Mounts a host directory to the K3s configuration directory.
149    ///
150    /// This allows you to access the K3s configuration files (like kubeconfig)
151    /// from the host filesystem. The kubeconfig file will be created at
152    /// `{conf_mount_path}/k3s.yaml` and can be read using [`read_kube_config`](Self::read_kube_config).
153    ///
154    /// # Example
155    /// ```
156    /// use std::path::Path;
157    ///
158    /// use testcontainers_modules::k3s::K3s;
159    ///
160    /// let k3s = K3s::default().with_conf_mount(Path::new("/tmp/k3s-config"));
161    /// ```
162    pub fn with_conf_mount(mut self, conf_mount_path: impl AsRef<Path>) -> Self {
163        self.env_vars
164            .insert(String::from("K3S_KUBECONFIG_MODE"), String::from("644"));
165        Self {
166            conf_mount: Some(Mount::bind_mount(
167                conf_mount_path.as_ref().to_str().unwrap_or_default(),
168                "/etc/rancher/k3s/",
169            )),
170            ..self
171        }
172    }
173
174    /// Reads the kubeconfig file from the mounted configuration directory.
175    ///
176    /// This method reads the `k3s.yaml` file from the mounted configuration directory
177    /// that was set up using [`with_conf_mount`](Self::with_conf_mount).
178    /// The kubeconfig can be used to connect kubectl or other Kubernetes clients to the K3s cluster.
179    ///
180    /// # Example
181    /// ```no_run
182    /// use std::path::Path;
183    ///
184    /// use testcontainers_modules::k3s::K3s;
185    ///
186    /// let k3s = K3s::default().with_conf_mount(Path::new("/tmp/k3s-config"));
187    /// // After starting the container...
188    /// let kubeconfig = k3s.read_kube_config().expect("Failed to read kubeconfig");
189    /// ```
190    pub fn read_kube_config(&self) -> io::Result<String> {
191        let k3s_conf_file_path = self
192            .conf_mount
193            .as_ref()
194            .and_then(|mount| mount.source())
195            .map(PathBuf::from)
196            .map(|conf_dir| conf_dir.join("k3s.yaml"))
197            .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "K3s conf dir is not mounted"))?;
198
199        std::fs::read_to_string(k3s_conf_file_path)
200    }
201}
202
203impl IntoIterator for &K3sCmd {
204    type Item = String;
205    type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
206
207    fn into_iter(self) -> Self::IntoIter {
208        let mut cmd = vec![String::from("server")];
209        cmd.push(format!("--snapshotter={}", self.snapshotter));
210        cmd.into_iter()
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use std::env::temp_dir;
217
218    use k8s_openapi::api::core::v1::Pod;
219    use kube::{
220        api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams, ResourceExt},
221        config::{KubeConfigOptions, Kubeconfig},
222        runtime::wait::{await_condition, conditions::is_pod_running},
223        Config,
224    };
225    use rustls::crypto::CryptoProvider;
226    use serde_json::json;
227    use serial_test::serial;
228    use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt};
229
230    use super::*;
231
232    #[serial]
233    #[tokio::test]
234    async fn k3s_pods() -> Result<(), Box<dyn std::error::Error + 'static>> {
235        let conf_dir = temp_dir();
236        let k3s = K3s::default()
237            .with_conf_mount(&conf_dir)
238            .with_privileged(true)
239            .with_userns_mode("host");
240
241        let k3s_container = k3s.start().await?;
242
243        let client = get_kube_client(&k3s_container).await?;
244
245        let pods = Api::<Pod>::all(client)
246            .list(&ListParams::default())
247            .await
248            .expect("Cannot read pods");
249
250        let pod_names = pods
251            .into_iter()
252            .map(|pod| pod.name_any())
253            .collect::<Vec<_>>();
254
255        assert!(
256            pod_names
257                .iter()
258                .any(|pod_name| pod_name.starts_with("coredns")),
259            "coredns pod not found - found pods {pod_names:?}"
260        );
261        assert!(
262            pod_names
263                .iter()
264                .any(|pod_name| pod_name.starts_with("metrics-server")),
265            "metrics-server pod not found - found pods {pod_names:?}"
266        );
267        assert!(
268            pod_names
269                .iter()
270                .any(|pod_name| pod_name.starts_with("local-path-provisioner")),
271            "local-path-provisioner pod not found - found pods {pod_names:?}"
272        );
273        Ok(())
274    }
275
276    // Based on: https://github.com/kube-rs/kube/blob/main/examples/pod_api.rs
277    #[serial]
278    #[tokio::test]
279    async fn pod_api() -> Result<(), Box<dyn std::error::Error + 'static>> {
280        let conf_dir = temp_dir();
281        let k3s = K3s::default()
282            .with_conf_mount(&conf_dir)
283            .with_privileged(true)
284            .with_userns_mode("host");
285
286        let k3s_container = k3s.start().await?;
287
288        let client = get_kube_client(&k3s_container).await?;
289
290        // Manage pods
291        let pods: Api<Pod> = Api::default_namespaced(client);
292
293        // Create Pod blog
294        let p: Pod = serde_json::from_value(json!({
295            "apiVersion": "v1",
296            "kind": "Pod",
297            "metadata": { "name": "busybox" },
298            "spec": {
299                "containers": [{
300                  "name": "busybox",
301                  "image": "busybox:1.36.1-musl"
302                }],
303            }
304        }))?;
305
306        let post_params = PostParams::default();
307        match pods.create(&post_params, &p).await {
308            Ok(o) => {
309                let name = o.name_any();
310                assert_eq!(p.name_any(), name);
311            }
312            Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if you skipped delete, for instance
313            Err(e) => return Err(e.into()),                        // any other case is probably bad
314        }
315
316        // Watch it phase for a few seconds
317        let establish = await_condition(pods.clone(), "busybox", is_pod_running());
318        let _ = tokio::time::timeout(std::time::Duration::from_secs(15), establish).await?;
319
320        // Verify we can get it
321        let p1cpy = pods.get("busybox").await?;
322        if let Some(spec) = &p1cpy.spec {
323            assert_eq!(spec.containers[0].name, "busybox");
324        }
325
326        // Replace its spec
327        let patch = json!({
328            "metadata": {
329                "resourceVersion": p1cpy.resource_version(),
330            },
331            "spec": {
332                "activeDeadlineSeconds": 5
333            }
334        });
335
336        let patch_params = PatchParams::default();
337        let p_patched = pods
338            .patch("busybox", &patch_params, &Patch::Merge(&patch))
339            .await?;
340        assert_eq!(p_patched.spec.unwrap().active_deadline_seconds, Some(5));
341
342        let lp = ListParams::default().fields(&format!("metadata.name={}", "busybox")); // only want results for our pod
343        for p in pods.list(&lp).await? {
344            println!("Found Pod: {}", p.name_any());
345        }
346
347        // Delete it
348        let delete_params = DeleteParams::default();
349        pods.delete("busybox", &delete_params)
350            .await?
351            .map_left(|pdel| {
352                assert_eq!(pdel.name_any(), "busybox");
353            });
354
355        Ok(())
356    }
357
358    pub async fn get_kube_client(
359        container: &ContainerAsync<K3s>,
360    ) -> Result<kube::Client, Box<dyn std::error::Error + 'static>> {
361        if CryptoProvider::get_default().is_none() {
362            rustls::crypto::ring::default_provider()
363                .install_default()
364                .expect("Error initializing rustls provider");
365        }
366
367        let conf_yaml = container.image().read_kube_config()?;
368
369        let mut config = Kubeconfig::from_yaml(&conf_yaml).expect("Error loading kube config");
370
371        let port = container.get_host_port_ipv4(KUBE_SECURE_PORT).await?;
372        config.clusters.iter_mut().for_each(|cluster| {
373            if let Some(server) = cluster.cluster.as_mut().and_then(|c| c.server.as_mut()) {
374                *server = format!("https://127.0.0.1:{port}")
375            }
376        });
377
378        let client_config =
379            Config::from_custom_kubeconfig(config, &KubeConfigOptions::default()).await?;
380
381        Ok(kube::Client::try_from(client_config)?)
382    }
383}