use anyhow::Context;
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::{Pod, Service};
use kube::{
api::ListParams,
config::{KubeConfigOptions, Kubeconfig},
Api, ResourceExt,
};
use rand::random;
use std::{
net::{Ipv4Addr, SocketAddrV4},
path::{Path, PathBuf},
process::{Command, Stdio},
};
use stopper::Stopper;
use tempfile::NamedTempFile;
use tokio::{
net::{TcpListener, TcpStream},
task::{self},
};
use tokio_stream::wrappers::TcpListenerStream;
use tracing::{debug, error, trace};
pub struct Cluster {
kubeconfig_path: PathBuf,
context_name: String,
}
impl Cluster {
pub fn new<P: AsRef<Path>>(kubeconfig_path: P, context_name: &str) -> Self {
let kubeconfig_path = kubeconfig_path.as_ref();
debug!(?kubeconfig_path, context_name, "Creating cluster handle");
Self {
kubeconfig_path: kubeconfig_path.to_path_buf(),
context_name: context_name.to_owned(),
}
}
pub async fn client(&self) -> kube::Client {
kube::Client::try_from(
kube::Config::from_custom_kubeconfig(
Kubeconfig::read_from(&self.kubeconfig_path).unwrap(),
&KubeConfigOptions {
context: Some(self.context_name.clone()),
..KubeConfigOptions::default()
},
)
.await
.unwrap(),
)
.unwrap()
}
pub async fn forward_port(
&self,
namespace: &str,
service_name: &str,
service_port: u16,
) -> PortForward {
let client = self.client().await;
let services: Api<Service> = Api::namespaced(client, namespace);
let service = services.get(service_name).await.unwrap();
let selector = service.spec.as_ref().unwrap().selector.as_ref().unwrap();
let mut label_selector_param = selector
.iter()
.flat_map(|(name, value)| [name, "=", value, ","])
.collect::<String>();
label_selector_param.pop();
let lp = ListParams::default().labels(&label_selector_param);
let client = self.client().await;
let pods: Api<Pod> = Api::namespaced(client, namespace);
let matching_pods = pods.list(&lp).await.unwrap();
let pod = matching_pods
.items
.first()
.unwrap_or_else(|| panic!("could not find any pods for the service {service_name}"));
let pod_name = pod.name_unchecked();
let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
.await
.unwrap();
let local_port = tcp_listener.local_addr().unwrap().port();
debug!(
namespace,
service_name, service_port, local_port, "Forwarding port"
);
let stopper = Stopper::new();
let stream = stopper.stop_stream(TcpListenerStream::new(tcp_listener));
task::spawn({
let stopper = stopper.clone();
async move {
let res = stream
.try_for_each(|stream| async {
let (pods, pod_name, stopper) =
(pods.clone(), pod_name.clone(), stopper.clone());
trace!(local_port, "new connection");
task::spawn(async move {
if let Err(e) =
forward_connection(&pods, &pod_name, service_port, stream, &stopper)
.await
{
error!(local_port, error = %e, "Port forward error");
}
});
Ok(())
})
.await;
if let Err(e) = res {
error!(local_port, error = %e, "Port forward TCP server error");
}
}
});
PortForward {
local_port,
stopper,
}
}
}
async fn forward_connection(
pods_api: &Api<Pod>,
pod_name: &str,
port: u16,
mut tcp_stream: TcpStream,
stopper: &Stopper,
) -> anyhow::Result<()> {
let mut forwarder = pods_api.portforward(pod_name, &[port]).await?;
let mut pod_stream = forwarder
.take_stream(port)
.context("stream for forwarded port was missing")?;
stopper
.stop_future(tokio::io::copy_bidirectional(
&mut tcp_stream,
&mut pod_stream,
))
.await
.transpose()?;
drop(pod_stream);
forwarder.join().await?;
trace!("connection closed");
Ok(())
}
pub struct EphemeralCluster {
cluster: Cluster,
kind_cluster_name: String,
}
impl EphemeralCluster {
pub fn create() -> Self {
let kubeconfig_path = NamedTempFile::new().unwrap().into_temp_path().to_path_buf();
let kind_cluster_name = format!("janus-ephemeral-{}", hex::encode(random::<[u8; 4]>()));
let output = Command::new("kind")
.args([
"create",
"cluster",
"--kubeconfig",
&kubeconfig_path.to_string_lossy(),
"--name",
&kind_cluster_name,
"--image",
"kindest/node:v1.26.6@sha256:\
6e2d8b28a5b601defe327b98bd1c2d1930b49e5d8c512e1895099e4504007adb",
])
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.unwrap();
assert!(
output.status.success(),
"`kind create cluster` failed\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
Self {
cluster: Cluster::new(kubeconfig_path, &format!("kind-{kind_cluster_name}")),
kind_cluster_name,
}
}
pub fn cluster(&self) -> &Cluster {
&self.cluster
}
}
impl Drop for EphemeralCluster {
fn drop(&mut self) {
assert!(Command::new("kind")
.args([
"delete",
"cluster",
"--kubeconfig",
&self.cluster.kubeconfig_path.to_string_lossy(),
"--name",
&self.kind_cluster_name,
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.unwrap()
.success())
}
}
pub struct PortForward {
local_port: u16,
stopper: Stopper,
}
impl PortForward {
pub fn local_port(&self) -> u16 {
self.local_port
}
}
impl Drop for PortForward {
fn drop(&mut self) {
debug!(?self.local_port, "dropping port forward");
self.stopper.stop();
}
}
#[cfg(test)]
mod tests {
use super::EphemeralCluster;
use crate::test_util::install_test_trace_subscriber;
use k8s_openapi::api::core::v1::Node;
use kube::{api::ListParams, Api};
#[tokio::test]
async fn create_clusters() {
install_test_trace_subscriber();
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let first_cluster = EphemeralCluster::create();
let first_client = first_cluster.cluster.client().await;
let first_nodes: Api<Node> = Api::all(first_client);
assert_eq!(
first_nodes
.list(&ListParams::default())
.await
.iter()
.count(),
1
);
let second_cluster = EphemeralCluster::create();
let second_client = second_cluster.cluster.client().await;
let second_nodes: Api<Node> = Api::all(second_client);
assert_eq!(
second_nodes
.list(&ListParams::default())
.await
.iter()
.count(),
1
);
}
}