#![allow(clippy::result_large_err)]
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use cfgd_core::PathDisplayExt;
use tonic::{Request, Response, Status};
use crate::cache::Cache;
use crate::csi::v1::node_server::Node;
use crate::csi::v1::{
NodeExpandVolumeRequest, NodeExpandVolumeResponse, NodeGetCapabilitiesRequest,
NodeGetCapabilitiesResponse, NodeGetInfoRequest, NodeGetInfoResponse,
NodeGetVolumeStatsRequest, NodeGetVolumeStatsResponse, NodePublishVolumeRequest,
NodePublishVolumeResponse, NodeServiceCapability, NodeStageVolumeRequest,
NodeStageVolumeResponse, NodeUnpublishVolumeRequest, NodeUnpublishVolumeResponse,
NodeUnstageVolumeRequest, NodeUnstageVolumeResponse, VolumeUsage, node_service_capability,
volume_usage,
};
use crate::metrics::{CsiMetrics, ModuleLabels, PublishLabels, PullLabels};
pub const ALLOWED_REGISTRIES_ENV: &str = "CFGD_CSI_ALLOWED_REGISTRIES";
pub struct CfgdNode {
cache: Arc<Cache>,
metrics: Arc<CsiMetrics>,
node_id: String,
allowed_registries: Option<Vec<String>>,
}
impl CfgdNode {
pub fn new(cache: Arc<Cache>, metrics: Arc<CsiMetrics>, node_id: String) -> Self {
let allowed_registries = parse_allowed_registries_from_env();
match &allowed_registries {
None => tracing::warn!(
env = ALLOWED_REGISTRIES_ENV,
"CSI registry allow-list is not configured — accepting any ociRef from volume context. In multi-tenant clusters set this env (comma-separated host[:port]) to restrict pulls."
),
Some(list) if list.is_empty() => tracing::warn!(
env = ALLOWED_REGISTRIES_ENV,
"CSI registry allow-list is explicitly empty — all module pulls will be refused."
),
Some(list) => tracing::info!(
env = ALLOWED_REGISTRIES_ENV,
count = list.len(),
"CSI registry allow-list active"
),
}
Self {
cache,
metrics,
node_id,
allowed_registries,
}
}
}
fn parse_allowed_registries_from_env() -> Option<Vec<String>> {
let raw = std::env::var(ALLOWED_REGISTRIES_ENV).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
if trimmed == "*" {
return None;
}
Some(
trimmed
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect(),
)
}
fn registry_of(oci_ref: &str) -> &str {
let first_slash = oci_ref.find('/').unwrap_or(oci_ref.len());
let head = &oci_ref[..first_slash];
if head.contains('.') || head.contains(':') || head == "localhost" {
head
} else {
""
}
}
fn require_attr<'a>(attrs: &'a HashMap<String, String>, key: &str) -> Result<&'a str, Status> {
attrs
.get(key)
.map(|v| v.as_str())
.filter(|v| !v.is_empty())
.ok_or_else(|| {
Status::invalid_argument(format!("missing required volume attribute: {key}"))
})
}
fn require_volume_id(volume_id: &str) -> Result<(), Status> {
if volume_id.is_empty() {
return Err(Status::invalid_argument("volume_id is required"));
}
Ok(())
}
fn resolve_oci_ref(attrs: &HashMap<String, String>, module: &str, version: &str) -> String {
attrs
.get("ociRef")
.filter(|v| !v.is_empty())
.cloned()
.unwrap_or_else(|| format!("cfgd-modules/{module}:{version}"))
}
fn check_registry_allowed(
oci_ref: &str,
allowed_registries: Option<&[String]>,
) -> Result<(), Status> {
let Some(list) = allowed_registries else {
return Ok(());
};
let registry = registry_of(oci_ref);
if registry.is_empty() {
return Ok(());
}
if list.iter().any(|r| r == registry) {
return Ok(());
}
Err(Status::permission_denied(format!(
"registry '{registry}' is not in the CSI allow-list (set {ALLOWED_REGISTRIES_ENV})"
)))
}
#[tonic::async_trait]
impl Node for CfgdNode {
async fn node_stage_volume(
&self,
request: Request<NodeStageVolumeRequest>,
) -> Result<Response<NodeStageVolumeResponse>, Status> {
let req = request.into_inner();
require_volume_id(&req.volume_id)?;
if req.staging_target_path.is_empty() {
return Err(Status::invalid_argument("staging_target_path is required"));
}
if let Err(e) = cfgd_core::validate_no_traversal(Path::new(&req.staging_target_path)) {
return Err(Status::invalid_argument(format!(
"staging_target_path traversal rejected: {e}"
)));
}
let attrs = &req.volume_context;
let module = require_attr(attrs, "module")?;
let version = require_attr(attrs, "version")?;
let oci_ref = resolve_oci_ref(attrs, module, version);
check_registry_allowed(&oci_ref, self.allowed_registries.as_deref())?;
tracing::info!(
module = module,
version = version,
volume_id = req.volume_id,
"staging volume — pulling to cache"
);
let start = std::time::Instant::now();
let cached = self.cache.get(module, version).is_some();
self.cache
.get_or_pull(module, version, &oci_ref)
.map_err(|e| Status::internal(format!("cache pull failed: {e}")))?;
let duration = start.elapsed().as_secs_f64();
self.metrics
.pull_duration_seconds
.get_or_create(&PullLabels {
module: module.to_string(),
cached: cached.to_string(),
})
.observe(duration);
if cached {
self.metrics
.cache_hits_total
.get_or_create(&ModuleLabels {
module: module.to_string(),
})
.inc();
}
self.metrics
.cache_size_bytes
.set(self.cache.current_size_bytes() as i64);
Ok(Response::new(NodeStageVolumeResponse {}))
}
async fn node_unstage_volume(
&self,
request: Request<NodeUnstageVolumeRequest>,
) -> Result<Response<NodeUnstageVolumeResponse>, Status> {
let req = request.into_inner();
require_volume_id(&req.volume_id)?;
tracing::debug!(
volume_id = req.volume_id,
"unstage volume (no-op, cache persists)"
);
Ok(Response::new(NodeUnstageVolumeResponse {}))
}
async fn node_publish_volume(
&self,
request: Request<NodePublishVolumeRequest>,
) -> Result<Response<NodePublishVolumeResponse>, Status> {
let req = request.into_inner();
require_volume_id(&req.volume_id)?;
let attrs = &req.volume_context;
let module = require_attr(attrs, "module")?;
let version = require_attr(attrs, "version")?;
let target_path = &req.target_path;
if target_path.is_empty() {
return Err(Status::invalid_argument("target_path is required"));
}
if let Err(e) = cfgd_core::validate_no_traversal(Path::new(target_path)) {
return Err(Status::invalid_argument(format!(
"target_path traversal rejected: {e}"
)));
}
let target = Path::new(target_path);
if is_mountpoint(target) {
if is_readonly_mount(target) {
tracing::debug!(
target = target_path,
"already mounted read-only, returning success"
);
return Ok(Response::new(NodePublishVolumeResponse {}));
}
tracing::warn!(
target = target_path,
"mount exists but is not read-only, attempting remount"
);
#[cfg(target_os = "linux")]
{
use nix::mount::{MsFlags, mount};
mount(
None::<&str>,
target,
None::<&str>,
MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
None::<&str>,
)
.map_err(|e| Status::internal(format!("read-only remount failed: {e}")))?;
}
return Ok(Response::new(NodePublishVolumeResponse {}));
}
tracing::info!(
module = module,
version = version,
target = target_path,
volume_id = req.volume_id,
"publishing volume"
);
let oci_ref = resolve_oci_ref(attrs, module, version);
check_registry_allowed(&oci_ref, self.allowed_registries.as_deref())?;
let cache = Arc::clone(&self.cache);
let metrics = Arc::clone(&self.metrics);
let module = module.to_string();
let version = version.to_string();
let oci_ref_owned = oci_ref.clone();
let target_path_owned: std::path::PathBuf = target.to_path_buf();
tokio::task::spawn_blocking(move || {
let source = cache
.get_or_pull(&module, &version, &oci_ref_owned)
.map_err(|e| Status::internal(format!("cache pull failed: {e}")))?;
std::fs::create_dir_all(&target_path_owned)
.map_err(|e| Status::internal(format!("cannot create target dir: {e}")))?;
match bind_mount_readonly(&source, &target_path_owned) {
Ok(()) => {
metrics
.volume_publish_total
.get_or_create(&PublishLabels {
module: module.clone(),
result: "success".to_string(),
})
.inc();
Ok(())
}
Err(e) => {
metrics
.volume_publish_total
.get_or_create(&PublishLabels {
module: module.clone(),
result: "error".to_string(),
})
.inc();
if let Err(rm_err) = std::fs::remove_dir(&target_path_owned) {
tracing::warn!(
error = %rm_err,
target = %target_path_owned.posix(),
"failed to remove mount target after bind_mount failure",
);
}
Err(e)
}
}
})
.await
.map_err(|e| Status::internal(format!("publish task join failed: {e}")))??;
Ok(Response::new(NodePublishVolumeResponse {}))
}
async fn node_unpublish_volume(
&self,
request: Request<NodeUnpublishVolumeRequest>,
) -> Result<Response<NodeUnpublishVolumeResponse>, Status> {
let req = request.into_inner();
require_volume_id(&req.volume_id)?;
let target_path = &req.target_path;
if target_path.is_empty() {
return Err(Status::invalid_argument("target_path is required"));
}
if let Err(e) = cfgd_core::validate_no_traversal(std::path::Path::new(target_path)) {
return Err(Status::invalid_argument(format!(
"target_path traversal rejected: {e}"
)));
}
tracing::info!(
target = target_path,
volume_id = req.volume_id,
"unpublishing volume"
);
let target_path_owned: std::path::PathBuf = target_path.into();
tokio::task::spawn_blocking(move || -> Result<(), Status> {
unmount(&target_path_owned)?;
if let Err(e) = std::fs::remove_dir(&target_path_owned) {
tracing::warn!(target = %target_path_owned.posix(), error = %e, "failed to remove target directory after unmount");
}
Ok(())
})
.await
.map_err(|e| Status::internal(format!("unpublish task join failed: {e}")))??;
Ok(Response::new(NodeUnpublishVolumeResponse {}))
}
async fn node_get_volume_stats(
&self,
request: Request<NodeGetVolumeStatsRequest>,
) -> Result<Response<NodeGetVolumeStatsResponse>, Status> {
let req = request.into_inner();
require_volume_id(&req.volume_id)?;
let volume_path = &req.volume_path;
if volume_path.is_empty() {
return Err(Status::invalid_argument("volume_path is required"));
}
let path = Path::new(volume_path);
if !path.exists() {
return Err(Status::not_found(format!(
"volume path does not exist: {volume_path}"
)));
}
let (bytes, inodes) = walk_volume_stats(path);
tracing::debug!(
volume_id = req.volume_id,
volume_path = volume_path,
bytes = bytes,
inodes = inodes,
"volume stats"
);
Ok(Response::new(NodeGetVolumeStatsResponse {
usage: vec![
VolumeUsage {
total: bytes as i64,
used: bytes as i64,
available: 0,
unit: volume_usage::Unit::Bytes as i32,
},
VolumeUsage {
total: inodes as i64,
used: inodes as i64,
available: 0,
unit: volume_usage::Unit::Inodes as i32,
},
],
volume_condition: None,
}))
}
async fn node_expand_volume(
&self,
_request: Request<NodeExpandVolumeRequest>,
) -> Result<Response<NodeExpandVolumeResponse>, Status> {
Err(Status::unimplemented("NodeExpandVolume not supported"))
}
async fn node_get_capabilities(
&self,
_request: Request<NodeGetCapabilitiesRequest>,
) -> Result<Response<NodeGetCapabilitiesResponse>, Status> {
tracing::debug!("NodeGetCapabilities called");
Ok(Response::new(NodeGetCapabilitiesResponse {
capabilities: vec![
NodeServiceCapability {
r#type: Some(node_service_capability::Type::Rpc(
node_service_capability::Rpc {
r#type: node_service_capability::rpc::Type::StageUnstageVolume.into(),
},
)),
},
NodeServiceCapability {
r#type: Some(node_service_capability::Type::Rpc(
node_service_capability::Rpc {
r#type: node_service_capability::rpc::Type::GetVolumeStats.into(),
},
)),
},
],
}))
}
async fn node_get_info(
&self,
_request: Request<NodeGetInfoRequest>,
) -> Result<Response<NodeGetInfoResponse>, Status> {
tracing::debug!(node_id = %self.node_id, "NodeGetInfo called");
Ok(Response::new(NodeGetInfoResponse {
node_id: self.node_id.clone(),
max_volumes_per_node: 0, accessible_topology: None,
}))
}
}
fn is_mountpoint(path: &Path) -> bool {
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::MetadataExt;
let Ok(path_meta) = std::fs::metadata(path) else {
return false;
};
let Some(parent) = path.parent() else {
return true; };
let Ok(parent_meta) = std::fs::metadata(parent) else {
return false;
};
path_meta.dev() != parent_meta.dev()
}
#[cfg(not(target_os = "linux"))]
{
let _ = path;
false
}
}
#[cfg(target_os = "linux")]
fn is_readonly_mount(path: &Path) -> bool {
use nix::sys::statvfs::{FsFlags, statvfs};
statvfs(path)
.map(|stat| stat.flags().contains(FsFlags::ST_RDONLY))
.unwrap_or(false)
}
#[cfg(not(target_os = "linux"))]
fn is_readonly_mount(_path: &Path) -> bool {
false
}
#[cfg(target_os = "linux")]
fn bind_mount_readonly(source: &Path, target: &Path) -> Result<(), Status> {
use nix::mount::{MsFlags, mount};
mount(
Some(source),
target,
None::<&str>,
MsFlags::MS_BIND,
None::<&str>,
)
.map_err(|e| Status::internal(format!("bind mount failed: {e}")))?;
if let Err(e) = mount(
None::<&str>,
target,
None::<&str>,
MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
None::<&str>,
) {
if let Err(umount_err) = nix::mount::umount2(target, nix::mount::MntFlags::MNT_DETACH) {
tracing::debug!(
error = %umount_err,
target = %target.display(),
"best-effort cleanup umount2 failed after read-only remount error",
);
}
return Err(Status::internal(format!("read-only remount failed: {e}")));
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn bind_mount_readonly(_source: &Path, _target: &Path) -> Result<(), Status> {
Err(Status::unimplemented("bind mount only supported on Linux"))
}
#[cfg(target_os = "linux")]
fn unmount(target: &Path) -> Result<(), Status> {
use nix::mount::{MntFlags, umount2};
match umount2(target, MntFlags::MNT_DETACH) {
Ok(()) => Ok(()),
Err(nix::errno::Errno::EINVAL)
| Err(nix::errno::Errno::ENOENT)
| Err(nix::errno::Errno::EPERM) => {
Ok(())
}
Err(e) => Err(Status::internal(format!("unmount failed: {e}"))),
}
}
#[cfg(not(target_os = "linux"))]
fn unmount(_target: &Path) -> Result<(), Status> {
Ok(())
}
fn walk_volume_stats(path: &Path) -> (u64, u64) {
let mut bytes = 0u64;
let mut inodes = 0u64;
fn walk(path: &Path, bytes: &mut u64, inodes: &mut u64) {
let entries = match std::fs::read_dir(path) {
Ok(rd) => rd,
Err(_) => return,
};
for entry in entries.flatten() {
*inodes += 1;
let p = entry.path();
let Ok(meta) = p.symlink_metadata() else {
continue;
};
if meta.is_symlink() {
*bytes = bytes.saturating_add(meta.len());
} else if meta.is_dir() {
walk(&p, bytes, inodes);
} else {
*bytes = bytes.saturating_add(meta.len());
}
}
}
inodes += 1;
walk(path, &mut bytes, &mut inodes);
(bytes, inodes)
}
#[cfg(test)]
mod tests;