use {
k8s_openapi::api::core::v1::Pod,
kube::{
api::{
Api,
ListParams,
},
Client,
},
tokio::{
net::TcpListener,
sync::{
mpsc,
watch,
},
},
};
#[derive(Clone, Debug, PartialEq)]
pub enum PortForwardStatus {
Starting,
Active,
Paused,
Reconnecting { attempt: u32 },
Error(String),
Cancelled,
}
impl PortForwardStatus {
pub fn display(&self) -> &str {
match self {
| Self::Starting => "Starting",
| Self::Active => "Active",
| Self::Paused => "Paused",
| Self::Reconnecting { .. } => "Reconnecting",
| Self::Error(_) => "Error",
| Self::Cancelled => "Cancelled",
}
}
pub fn is_running(&self) -> bool {
matches!(self, Self::Active | Self::Reconnecting { .. } | Self::Starting)
}
}
#[derive(Clone, Debug)]
pub enum PfTarget {
DirectPod { pod_name: String },
LabelSelector { selector: String },
}
#[derive(Clone, Debug)]
pub struct PortInfo {
pub container_port: u16,
pub name: String,
pub protocol: String,
}
#[allow(dead_code)]
pub struct PortForwardEntry {
pub id: usize,
pub local_port: u16,
pub remote_port: u16,
pub pod_name: String,
pub namespace: String,
pub context: String,
pub resource_label: String,
pub status: PortForwardStatus,
pub connections: usize,
pub target: Option<PfTarget>,
cancel_tx: watch::Sender<bool>,
pause_tx: watch::Sender<bool>,
}
enum PfUpdate {
Status { id: usize, status: PortForwardStatus },
Connection { id: usize, delta: i32 },
PodResolved { id: usize, pod_name: String },
}
pub struct PortForwardManager {
entries: Vec<PortForwardEntry>,
next_id: usize,
update_rx: mpsc::UnboundedReceiver<PfUpdate>,
update_tx: mpsc::UnboundedSender<PfUpdate>,
}
impl PortForwardManager {
pub fn new() -> Self {
let (update_tx, update_rx) = mpsc::unbounded_channel();
Self {
entries: Vec::new(),
next_id: 0,
update_rx,
update_tx,
}
}
pub fn entries(&self) -> &[PortForwardEntry] {
&self.entries
}
#[allow(dead_code)]
pub fn get(&self, id: usize) -> Option<&PortForwardEntry> {
self.entries.iter().find(|e| e.id == id)
}
pub fn create(
&mut self,
client: Client,
namespace: String,
pod_name: String,
context: String,
resource_label: String,
local_port: u16,
remote_port: u16,
target: PfTarget,
) -> usize {
let id = self.next_id;
self.next_id += 1;
let (cancel_tx, cancel_rx) = watch::channel(false);
let (pause_tx, pause_rx) = watch::channel(false);
let entry = PortForwardEntry {
id,
local_port,
remote_port,
pod_name: pod_name.clone(),
namespace: namespace.clone(),
context,
resource_label,
status: PortForwardStatus::Starting,
connections: 0,
target: Some(target.clone()),
cancel_tx,
pause_tx,
};
self.entries.push(entry);
let update_tx = self.update_tx.clone();
tokio::spawn(port_forward_task(
id,
client,
namespace,
target,
local_port,
remote_port,
cancel_rx,
pause_rx,
update_tx,
));
id
}
pub fn create_paused(
&mut self,
namespace: String,
pod_name: String,
context: String,
resource_label: String,
local_port: u16,
remote_port: u16,
target: PfTarget,
) -> usize {
let id = self.next_id;
self.next_id += 1;
let (cancel_tx, _cancel_rx) = watch::channel(false);
let (pause_tx, _pause_rx) = watch::channel(true);
let entry = PortForwardEntry {
id,
local_port,
remote_port,
pod_name,
namespace,
context,
resource_label,
status: PortForwardStatus::Paused,
connections: 0,
target: Some(target),
cancel_tx,
pause_tx,
};
self.entries.push(entry);
id
}
pub fn resume_spawn(&mut self, id: usize, client: Client) -> bool {
let entry = match self.entries.iter_mut().find(|e| e.id == id) {
| Some(e) if matches!(e.status, PortForwardStatus::Paused | PortForwardStatus::Error(_)) => e,
| _ => return false,
};
let target = match entry.target.clone() {
| Some(t) => t,
| None => return false,
};
let local_port = entry.local_port;
let remote_port = entry.remote_port;
let namespace = entry.namespace.clone();
let (cancel_tx, cancel_rx) = watch::channel(false);
let (pause_tx, pause_rx) = watch::channel(false);
entry.cancel_tx = cancel_tx;
entry.pause_tx = pause_tx;
entry.status = PortForwardStatus::Starting;
let update_tx = self.update_tx.clone();
tokio::spawn(port_forward_task(
id,
client,
namespace,
target,
local_port,
remote_port,
cancel_rx,
pause_rx,
update_tx,
));
true
}
pub fn cancel(&mut self, id: usize) {
if let Some(entry) = self.entries.iter_mut().find(|e| e.id == id) {
let _ = entry.cancel_tx.send(true);
entry.status = PortForwardStatus::Cancelled;
}
}
pub fn pause(&mut self, id: usize) {
if let Some(entry) = self.entries.iter_mut().find(|e| e.id == id) {
if entry.status.is_running() {
let _ = entry.pause_tx.send(true);
}
if entry.status.is_running() || matches!(entry.status, PortForwardStatus::Error(_)) {
entry.status = PortForwardStatus::Paused;
}
}
}
pub fn resume(&mut self, id: usize) {
if let Some(entry) = self.entries.iter_mut().find(|e| e.id == id) {
if matches!(entry.status, PortForwardStatus::Paused) {
let _ = entry.pause_tx.send(false);
entry.status = PortForwardStatus::Active;
}
}
}
pub fn cancel_all(&mut self) {
for entry in &mut self.entries {
if !matches!(entry.status, PortForwardStatus::Cancelled) {
let _ = entry.cancel_tx.send(true);
entry.status = PortForwardStatus::Cancelled;
}
}
self.entries.clear();
}
pub fn remove_cancelled(&mut self) {
self.entries
.retain(|e| !matches!(e.status, PortForwardStatus::Cancelled));
}
pub fn poll_updates(&mut self) {
while let Ok(update) = self.update_rx.try_recv() {
match update {
| PfUpdate::Status { id, status } => {
if let Some(entry) = self.entries.iter_mut().find(|e| e.id == id) {
if !matches!(entry.status, PortForwardStatus::Cancelled | PortForwardStatus::Paused) {
entry.status = status;
}
}
},
| PfUpdate::Connection { id, delta } => {
if let Some(entry) = self.entries.iter_mut().find(|e| e.id == id) {
if delta > 0 {
entry.connections = entry.connections.saturating_add(delta as usize);
} else {
entry.connections = entry.connections.saturating_sub(delta.unsigned_abs() as usize);
}
}
},
| PfUpdate::PodResolved { id, pod_name } => {
if let Some(entry) = self.entries.iter_mut().find(|e| e.id == id) {
entry.pod_name = pod_name;
}
},
}
}
}
}
impl Drop for PortForwardManager {
fn drop(&mut self) {
self.cancel_all();
}
}
async fn resolve_pod(client: &Client, namespace: &str, target: &PfTarget) -> Option<String> {
match target {
| PfTarget::DirectPod { pod_name } => Some(pod_name.clone()),
| PfTarget::LabelSelector { selector } => {
let api: Api<Pod> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default().labels(selector);
let pods = api.list(&lp).await.ok()?;
pods.items
.iter()
.find(|p| p.status.as_ref().and_then(|s| s.phase.as_deref()) == Some("Running"))
.or_else(|| pods.items.first())
.and_then(|p| p.metadata.name.clone())
},
}
}
async fn port_forward_task(
id: usize,
client: Client,
namespace: String,
target: PfTarget,
local_port: u16,
remote_port: u16,
mut cancel_rx: watch::Receiver<bool>,
mut pause_rx: watch::Receiver<bool>,
update_tx: mpsc::UnboundedSender<PfUpdate>,
) {
let listener = match TcpListener::bind(("127.0.0.1", local_port)).await {
| Ok(l) => l,
| Err(_) => {
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Error(format!("Port :{} in use", local_port)),
});
return;
},
};
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Active,
});
let mut listener = Some(listener);
loop {
if *pause_rx.borrow() {
drop(listener.take());
loop {
tokio::select! {
biased;
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
return;
}
}
_ = pause_rx.changed() => {
if !*pause_rx.borrow() {
match TcpListener::bind(("127.0.0.1", local_port)).await {
| Ok(l) => {
listener = Some(l);
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Active,
});
break;
},
| Err(e) => {
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Error(format!("Re-bind :{} {}", local_port, e)),
});
return;
},
}
}
}
}
}
}
let l = listener.as_ref().unwrap();
tokio::select! {
biased;
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
return;
}
}
_ = pause_rx.changed() => {
continue;
}
accept = l.accept() => {
match accept {
| Ok((tcp_stream, _)) => {
let pod_name = match resolve_pod(&client, &namespace, &target).await {
| Some(name) => {
let _ = update_tx.send(PfUpdate::PodResolved {
id,
pod_name: name.clone(),
});
name
},
| None => {
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Reconnecting { attempt: 1 },
});
drop(tcp_stream);
continue;
},
};
let client = client.clone();
let ns = namespace.clone();
let tx = update_tx.clone();
let cancel_rx2 = cancel_rx.clone();
tokio::spawn(async move {
let _ = tx.send(PfUpdate::Connection { id, delta: 1 });
handle_pf_connection(
id,
client,
&ns,
&pod_name,
remote_port,
tcp_stream,
cancel_rx2,
&tx,
)
.await;
let _ = tx.send(PfUpdate::Connection { id, delta: -1 });
});
},
| Err(e) => {
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Error(format!("Accept: {}", e)),
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
},
}
}
}
}
}
async fn handle_pf_connection(
id: usize,
client: Client,
namespace: &str,
pod_name: &str,
remote_port: u16,
mut tcp_stream: tokio::net::TcpStream,
mut cancel_rx: watch::Receiver<bool>,
update_tx: &mpsc::UnboundedSender<PfUpdate>,
) {
let pods: Api<Pod> = Api::namespaced(client, namespace);
let mut attempt = 0u32;
loop {
if *cancel_rx.borrow() {
return;
}
match pods.portforward(pod_name, &[remote_port]).await {
| Ok(mut pf) => {
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Active,
});
if let Some(mut pf_stream) = pf.take_stream(remote_port) {
tokio::select! {
result = tokio::io::copy_bidirectional(&mut tcp_stream, &mut pf_stream) => {
drop(pf_stream);
let _ = pf.join().await;
match result {
| Ok(_) | Err(_) => return,
}
}
_ = cancel_rx.changed() => {
return;
}
}
} else {
let _ = pf.join().await;
return;
}
},
| Err(e) => {
attempt += 1;
if attempt > 10 {
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Error(format!("Portforward: {}", e)),
});
return;
}
let _ = update_tx.send(PfUpdate::Status {
id,
status: PortForwardStatus::Reconnecting { attempt },
});
let backoff = std::time::Duration::from_millis(500 * 2u64.pow(attempt.min(6)));
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
return;
}
}
}
},
}
}
}
pub fn extract_ports(rt: crate::k8s::ResourceType, value: &serde_json::Value) -> Vec<PortInfo> {
use crate::k8s::ResourceType;
match rt {
| ResourceType::Service => extract_service_ports(value),
| ResourceType::Pod => extract_pod_ports(value),
| ResourceType::Deployment
| ResourceType::StatefulSet
| ResourceType::DaemonSet
| ResourceType::ReplicaSet
| ResourceType::Job
| ResourceType::CronJob => extract_template_ports(value),
| _ => Vec::new(),
}
}
fn extract_service_ports(value: &serde_json::Value) -> Vec<PortInfo> {
let ports = value
.get("spec")
.and_then(|s| s.get("ports"))
.and_then(|p| p.as_array());
match ports {
| Some(arr) => {
arr.iter()
.filter_map(|p| {
let target = p
.get("targetPort")
.and_then(|v| v.as_u64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
.unwrap_or_else(|| p.get("port").and_then(|v| v.as_u64()).unwrap_or(0));
if target == 0 || target > 65535 {
return None;
}
Some(PortInfo {
container_port: target as u16,
name: p.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string(),
protocol: p.get("protocol").and_then(|v| v.as_str()).unwrap_or("TCP").to_string(),
})
})
.collect()
},
| None => Vec::new(),
}
}
fn extract_container_ports(containers: &serde_json::Value) -> Vec<PortInfo> {
let arr = match containers.as_array() {
| Some(a) => a,
| None => return Vec::new(),
};
arr.iter()
.flat_map(|c| {
c.get("ports")
.and_then(|p| p.as_array())
.into_iter()
.flatten()
.filter_map(|p| {
let port = p.get("containerPort").and_then(|v| v.as_u64())?;
if port == 0 || port > 65535 {
return None;
}
Some(PortInfo {
container_port: port as u16,
name: p.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string(),
protocol: p.get("protocol").and_then(|v| v.as_str()).unwrap_or("TCP").to_string(),
})
})
})
.collect()
}
fn extract_pod_ports(value: &serde_json::Value) -> Vec<PortInfo> {
value
.get("spec")
.and_then(|s| s.get("containers"))
.map(extract_container_ports)
.unwrap_or_default()
}
fn extract_template_ports(value: &serde_json::Value) -> Vec<PortInfo> {
value
.get("spec")
.and_then(|s| s.get("template"))
.and_then(|t| t.get("spec"))
.and_then(|s| s.get("containers"))
.map(extract_container_ports)
.unwrap_or_default()
}
pub fn extract_selector(rt: crate::k8s::ResourceType, value: &serde_json::Value) -> Option<String> {
use crate::k8s::ResourceType;
match rt {
| ResourceType::Job => {
let name = value
.get("metadata")
.and_then(|m| m.get("name"))
.and_then(|n| n.as_str());
return name.map(|n| format!("job-name={}", n));
},
| ResourceType::CronJob => {
let labels = value
.get("spec")
.and_then(|s| s.get("jobTemplate"))
.and_then(|j| j.get("spec"))
.and_then(|s| s.get("template"))
.and_then(|t| t.get("metadata"))
.and_then(|m| m.get("labels"))
.and_then(|l| l.as_object());
return labels.map(|map| {
map.iter()
.map(|(k, v)| format!("{}={}", k, v.as_str().unwrap_or("")))
.collect::<Vec<_>>()
.join(",")
});
},
| _ => {},
}
let labels = match rt {
| ResourceType::Service => {
value
.get("spec")
.and_then(|s| s.get("selector"))
.and_then(|s| s.as_object())
},
| ResourceType::Deployment | ResourceType::StatefulSet | ResourceType::DaemonSet | ResourceType::ReplicaSet => {
value
.get("spec")
.and_then(|s| s.get("selector"))
.and_then(|s| s.get("matchLabels"))
.and_then(|m| m.as_object())
},
| _ => None,
};
labels.map(|map| {
map.iter()
.map(|(k, v)| format!("{}={}", k, v.as_str().unwrap_or("")))
.collect::<Vec<_>>()
.join(",")
})
}