use std::path::{Path, PathBuf};
use std::str::FromStr;
use k8s_csi::v1_3_0::node_client::NodeClient;
use k8s_csi::v1_3_0::node_service_capability::{rpc, Rpc, Type as CapabilityType};
use k8s_csi::v1_3_0::volume_capability::access_mode::Mode as CSIMode;
use k8s_csi::v1_3_0::volume_capability::{
AccessMode as CSIAccessMode, AccessType as CSIAccessType, MountVolume as CSIMountVolume,
};
use k8s_csi::v1_3_0::{
NodeGetCapabilitiesRequest, NodePublishVolumeRequest, NodeStageVolumeRequest,
NodeUnpublishVolumeRequest, VolumeCapability,
};
use k8s_openapi::api::core::v1::{
CSIPersistentVolumeSource, PersistentVolume, PersistentVolumeClaimSpec,
PersistentVolumeClaimVolumeSource,
};
use k8s_openapi::api::storage::v1::StorageClass;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use tempdir::TempDir;
use thiserror::Error;
use crate::grpc_sock;
use crate::plugin_watcher::PluginRegistry;
use super::*;
#[derive(Error, Debug)]
#[allow(clippy::enum_variant_names)]
enum VolumeError {
#[error("bad volume mode")]
BadVolumeMode,
#[error("bad reclaim policy")]
BadReclaimPolicy,
#[error("bad access mode")]
BadAccessMode,
}
#[derive(Debug)]
enum VolumeMode {
Block,
Filesystem,
}
impl FromStr for VolumeMode {
type Err = VolumeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Block" => Ok(VolumeMode::Block),
"Filesystem" => Ok(VolumeMode::Filesystem),
"" => Ok(VolumeMode::Filesystem),
_ => Err(VolumeError::BadVolumeMode),
}
}
}
#[derive(Debug)]
enum ReclaimPolicy {
Delete,
Recycle,
Retain,
}
impl FromStr for ReclaimPolicy {
type Err = VolumeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Delete" => Ok(ReclaimPolicy::Delete),
"Recycle" => Ok(ReclaimPolicy::Recycle),
"Retain" => Ok(ReclaimPolicy::Retain),
"" => Ok(ReclaimPolicy::Delete),
_ => Err(VolumeError::BadReclaimPolicy),
}
}
}
#[derive(Debug)]
#[allow(clippy::enum_variant_names)]
enum AccessMode {
ReadOnlyMany,
ReadWriteMany,
ReadWriteOnce,
}
impl FromStr for AccessMode {
type Err = VolumeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"ReadOnlyMany" => Ok(AccessMode::ReadOnlyMany),
"ReadWriteMany" => Ok(AccessMode::ReadWriteMany),
"ReadWriteOnce" => Ok(AccessMode::ReadWriteOnce),
_ => Err(VolumeError::BadAccessMode),
}
}
}
pub(crate) fn validate(spec: &PersistentVolumeClaimSpec) -> anyhow::Result<()> {
match &spec.access_modes {
Some(a) => {
if a.is_empty() {
return Err(anyhow::anyhow!("at least 1 access mode is required"));
} else {
for access_mode in a {
AccessMode::from_str(access_mode)?;
}
}
}
None => {
return Err(anyhow::anyhow!("at least 1 access mode is required"));
}
}
if let Some(selector) = &spec.selector {
validate_label_selector(selector)?;
}
match &spec.storage_class_name {
None => {
return Err(anyhow::anyhow!(
"PersistentVolumeClaim must specify a storage class"
));
}
Some(s) => {
if s.is_empty() {
return Err(anyhow::anyhow!(
"PersistentVolumeClaim must specify a storage class"
));
}
s
}
};
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn validate_label_selector(_selector: &LabelSelector) -> anyhow::Result<()> {
Ok(())
}
pub(crate) async fn populate(
pvc_source: &PersistentVolumeClaimVolumeSource,
client: &kube::Client,
namespace: &str,
pr: Option<Arc<PluginRegistry>>,
path: &PathBuf,
) -> anyhow::Result<VolumeType> {
if pr.is_none() {
return Err(anyhow::anyhow!(format!(
"failed to mount volume {}: CSI driver support not implemented",
&pvc_source.claim_name
)));
}
let plugin_registry = pr.unwrap();
let spec = get_pvc_spec(pvc_source, client, namespace).await?;
let mut csi_client = get_csi_client(client, &spec, plugin_registry).await?;
let csi = get_csi(client, pvc_source, &spec).await?;
let stage_unstage_volume = supports_stage_unstage(&mut csi_client).await?;
tokio::fs::create_dir_all(path).await?;
let staging_path = TempDir::new(&csi.volume_handle)?;
if stage_unstage_volume {
stage_volume(&mut csi_client, &csi, staging_path.path()).await?;
}
publish_volume(
&mut csi_client,
&csi,
staging_path.path(),
stage_unstage_volume,
path,
)
.await?;
Ok(VolumeType::PersistentVolumeClaim)
}
pub(crate) async fn unpopulate(
pvc_source: &PersistentVolumeClaimVolumeSource,
client: &kube::Client,
namespace: &str,
pr: Option<Arc<PluginRegistry>>,
path: &PathBuf,
) -> anyhow::Result<()> {
if pr.is_none() {
return Err(anyhow::anyhow!(format!(
"failed to unmount volume {}: CSI driver support not implemented",
&pvc_source.claim_name
)));
}
let plugin_registry = pr.unwrap();
let spec = get_pvc_spec(pvc_source, client, namespace).await?;
let mut csi_client = get_csi_client(client, &spec, plugin_registry).await?;
let csi = get_csi(client, pvc_source, &spec).await?;
unpublish_volume(&mut csi_client, &csi, path).await?;
std::fs::remove_dir_all(path)?;
Ok(())
}
async fn supports_stage_unstage(
csi_client: &mut NodeClient<tonic::transport::Channel>,
) -> anyhow::Result<bool> {
let mut stage_unstage_volume = false;
let response = csi_client
.node_get_capabilities(NodeGetCapabilitiesRequest {})
.await?;
for capability in &response.get_ref().capabilities {
if let Some(typ) = &capability.r#type {
let _typ_stage_unstage_volume = rpc::Type::StageUnstageVolume as i32;
match typ {
CapabilityType::Rpc(Rpc {
r#type: _typ_stage_unstage_volume,
}) => {
stage_unstage_volume = true;
}
}
}
}
Ok(stage_unstage_volume)
}
async fn stage_volume(
csi_client: &mut NodeClient<tonic::transport::Channel>,
csi: &CSIPersistentVolumeSource,
staging_path: &Path,
) -> anyhow::Result<()> {
csi_client
.node_stage_volume(NodeStageVolumeRequest {
volume_id: csi.volume_handle.clone(),
staging_target_path: staging_path.to_string_lossy().to_string(),
volume_capability: Some(VolumeCapability {
access_mode: Some(CSIAccessMode {
mode: CSIMode::SingleNodeWriter as i32,
}),
access_type: Some(CSIAccessType::Mount(CSIMountVolume {
fs_type: csi.fs_type.as_ref().map_or("".to_owned(), |s| s.clone()),
mount_flags: Default::default(),
})),
}),
secrets: Default::default(),
publish_context: Default::default(),
volume_context: Default::default(),
})
.await?;
Ok(())
}
async fn publish_volume(
csi_client: &mut NodeClient<tonic::transport::Channel>,
csi: &CSIPersistentVolumeSource,
staging_path: &Path,
stage_unstage_volume: bool,
path: &PathBuf,
) -> anyhow::Result<()> {
let mut req = NodePublishVolumeRequest {
volume_id: csi.volume_handle.clone(),
target_path: path.to_string_lossy().to_string(),
staging_target_path: "".to_owned(),
volume_capability: Some(VolumeCapability {
access_mode: Some(CSIAccessMode {
mode: CSIMode::SingleNodeWriter as i32,
}),
access_type: Some(CSIAccessType::Mount(CSIMountVolume {
fs_type: csi.fs_type.clone().unwrap_or_default(),
mount_flags: Default::default(),
})),
}),
readonly: false,
secrets: Default::default(),
publish_context: Default::default(),
volume_context: Default::default(),
};
if stage_unstage_volume {
req.staging_target_path = staging_path.to_string_lossy().to_string();
}
csi_client.node_publish_volume(req).await?;
Ok(())
}
async fn unpublish_volume(
csi_client: &mut NodeClient<tonic::transport::Channel>,
csi: &CSIPersistentVolumeSource,
path: &PathBuf,
) -> anyhow::Result<()> {
let req = NodeUnpublishVolumeRequest {
volume_id: csi.volume_handle.clone(),
target_path: path.to_string_lossy().to_string(),
};
csi_client.node_unpublish_volume(req).await?;
Ok(())
}
async fn get_csi_client(
client: &kube::Client,
spec: &PersistentVolumeClaimSpec,
plugin_registry: Arc<PluginRegistry>,
) -> anyhow::Result<NodeClient<tonic::transport::Channel>> {
let storage_class_client: Api<StorageClass> = Api::all(client.clone());
let storage_class = storage_class_client
.get(spec.storage_class_name.as_ref().unwrap())
.await?;
let endpoint = plugin_registry
.get_endpoint(&storage_class.provisioner)
.await
.ok_or_else(|| anyhow::anyhow!("could not get CSI plugin endpoint"))?;
let chan = grpc_sock::client::socket_channel(endpoint).await?;
Ok(NodeClient::new(chan))
}
async fn get_csi(
client: &kube::Client,
pvc_source: &PersistentVolumeClaimVolumeSource,
spec: &PersistentVolumeClaimSpec,
) -> anyhow::Result<CSIPersistentVolumeSource> {
let volume_name = spec.volume_name.as_ref().ok_or(anyhow::anyhow!(format!(
"volume name for PVC {} must exist",
pvc_source.claim_name
)))?;
let pv_client: Api<PersistentVolume> = Api::all(client.clone());
let pv = pv_client.get(&volume_name).await?;
let csi = pv
.spec
.ok_or_else(|| anyhow::anyhow!("no PersistentVolume spec defined"))?
.csi
.ok_or_else(|| anyhow::anyhow!("no CSI spec defined"))?;
Ok(csi)
}
async fn get_pvc_spec(
pvc_source: &PersistentVolumeClaimVolumeSource,
client: &kube::Client,
namespace: &str,
) -> anyhow::Result<PersistentVolumeClaimSpec> {
let pvc_client: Api<PersistentVolumeClaim> = Api::namespaced(client.clone(), namespace);
let pvc = pvc_client.get(&pvc_source.claim_name).await?;
let spec = match pvc.spec {
Some(s) => s,
None => {
return Err(anyhow::anyhow!("PersistentVolumeClaim must specify a spec"));
}
};
validate(&spec)?;
Ok(spec)
}