use std::os::fd::AsFd;
use std::os::unix::io::OwnedFd;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::path::Path;
use std::process::{Child, Command, Stdio};
use base64::prelude::*;
use jsonrpc_fdpass::transport::UnixSocketTransport;
use jsonrpc_fdpass::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, MessageWithFds};
use rustix::io::dup;
use rustix::process::{Signal, set_parent_process_death_signal};
use serde::{Deserialize, Serialize};
use tokio::net::UnixStream as TokioUnixStream;
use crate::layer::Layer;
use crate::storage::Storage;
use crate::tar_split::{TarSplitFdStream, TarSplitItem};
use crate::userns::can_bypass_file_permissions;
const HELPER_ENV: &str = "__CSTORAGE_USERNS_HELPER";
mod error_codes {
pub const INVALID_PARAMS: i32 = -32602;
pub const METHOD_NOT_FOUND: i32 = -32601;
pub const RESOURCE_NOT_FOUND: i32 = -32000;
pub const INTERNAL_ERROR: i32 = -32003;
}
mod methods {
pub const OPEN_FILE: &str = "userns.openFile";
pub const SHUTDOWN: &str = "userns.shutdown";
pub const LIST_IMAGES: &str = "userns.listImages";
pub const GET_IMAGE: &str = "userns.getImage";
pub const STREAM_LAYER: &str = "userns.streamLayer";
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenFileParams {
pub path: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenFileResult {
pub success: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListImagesParams {
pub storage_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImageInfo {
pub id: String,
pub names: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListImagesResult {
pub images: Vec<ImageInfo>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GetImageParams {
pub storage_path: String,
pub image_ref: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GetImageResult {
pub id: String,
pub names: Vec<String>,
pub layer_diff_ids: Vec<oci_spec::image::Digest>,
pub storage_layer_ids: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamLayerParams {
pub storage_path: String,
pub layer_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamSegmentNotification {
pub data: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamFileNotification {
pub name: String,
pub size: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamLayerResult {
pub items_sent: usize,
}
#[derive(Debug, thiserror::Error)]
pub enum HelperError {
#[error("failed to create socket: {0}")]
Socket(#[source] std::io::Error),
#[error("failed to spawn helper process: {0}")]
Spawn(#[source] std::io::Error),
#[error("IPC error: {0}")]
Ipc(String),
#[error("helper error: {0}")]
HelperError(String),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("RPC error: code={code}, message={message}")]
RpcError {
code: i32,
message: String,
},
}
pub fn init_if_helper() {
if std::env::var(HELPER_ENV).is_err() {
return; }
if let Err(e) = set_parent_process_death_signal(Some(Signal::TERM)) {
eprintln!("cstorage helper: failed to set parent death signal: {}", e);
}
let stdin_fd = match dup(std::io::stdin().as_fd()) {
Ok(fd) => fd,
Err(e) => {
eprintln!("cstorage helper: failed to dup stdin: {}", e);
std::process::exit(1);
}
};
let std_socket = StdUnixStream::from(stdin_fd);
if let Err(e) = run_helper_loop_blocking(std_socket) {
eprintln!("cstorage helper: error in helper loop: {}", e);
std::process::exit(1);
}
std::process::exit(0);
}
fn run_helper_loop_blocking(std_socket: StdUnixStream) -> std::result::Result<(), HelperError> {
std_socket.set_nonblocking(true)?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| HelperError::Ipc(format!("failed to create tokio runtime: {}", e)))?;
rt.block_on(run_helper_loop_async(std_socket))
}
async fn run_helper_loop_async(std_socket: StdUnixStream) -> std::result::Result<(), HelperError> {
let tokio_socket = TokioUnixStream::from_std(std_socket)
.map_err(|e| HelperError::Ipc(format!("failed to convert socket: {}", e)))?;
let transport = UnixSocketTransport::new(tokio_socket);
let (mut sender, mut receiver) = transport.split();
tracing::debug!("userns helper: starting request loop");
loop {
let msg_with_fds = match receiver.receive().await {
Ok(m) => m,
Err(jsonrpc_fdpass::Error::ConnectionClosed) => {
tracing::debug!("userns helper: connection closed");
return Ok(());
}
Err(e) => {
return Err(HelperError::Ipc(format!(
"failed to receive message: {}",
e
)));
}
};
match msg_with_fds.message {
JsonRpcMessage::Request(request) => {
let id = request.id.clone();
if request.method == methods::STREAM_LAYER {
if let Err((code, msg)) = handle_stream_layer(&request, &mut sender).await {
let error = jsonrpc_fdpass::JsonRpcError::owned(code, msg, None::<()>);
let response = JsonRpcResponse::error(error, id);
let message =
MessageWithFds::new(JsonRpcMessage::Response(response), vec![]);
sender.send(message).await.map_err(|e| {
HelperError::Ipc(format!("failed to send error response: {}", e))
})?;
}
continue;
}
let (result, fds) = handle_request(&request);
match result {
Ok(response_value) => {
let response = JsonRpcResponse::success(response_value, id);
let message = MessageWithFds::new(JsonRpcMessage::Response(response), fds);
sender.send(message).await.map_err(|e| {
HelperError::Ipc(format!("failed to send response: {}", e))
})?;
}
Err((code, message_str)) => {
let error =
jsonrpc_fdpass::JsonRpcError::owned(code, message_str, None::<()>);
let response = JsonRpcResponse::error(error, id);
let message =
MessageWithFds::new(JsonRpcMessage::Response(response), vec![]);
sender.send(message).await.map_err(|e| {
HelperError::Ipc(format!("failed to send error response: {}", e))
})?;
}
}
if request.method == methods::SHUTDOWN {
tracing::debug!("userns helper: received shutdown request");
return Ok(());
}
}
JsonRpcMessage::Notification(notif) => {
if notif.method == methods::SHUTDOWN {
tracing::debug!("userns helper: received shutdown notification");
return Ok(());
}
}
JsonRpcMessage::Response(_) => {
}
}
}
}
async fn handle_stream_layer(
request: &JsonRpcRequest,
sender: &mut jsonrpc_fdpass::transport::Sender,
) -> std::result::Result<(), (i32, String)> {
let params: StreamLayerParams = request
.params
.as_ref()
.and_then(|p| serde_json::from_value(p.clone()).ok())
.ok_or((
error_codes::INVALID_PARAMS,
"invalid params for streamLayer".to_string(),
))?;
let storage = Storage::open(¶ms.storage_path).map_err(|e| {
(
error_codes::INTERNAL_ERROR,
format!("failed to open storage: {}", e),
)
})?;
let layer = Layer::open(&storage, ¶ms.layer_id).map_err(|e| {
(
error_codes::RESOURCE_NOT_FOUND,
format!("layer not found: {}", e),
)
})?;
let mut stream = TarSplitFdStream::new(&storage, &layer).map_err(|e| {
(
error_codes::INTERNAL_ERROR,
format!("failed to create tar-split stream: {}", e),
)
})?;
let mut items_sent = 0usize;
while let Some(item) = stream
.next()
.map_err(|e| (error_codes::INTERNAL_ERROR, format!("stream error: {}", e)))?
{
match item {
TarSplitItem::Segment(bytes) => {
let params = StreamSegmentNotification {
data: BASE64_STANDARD.encode(&bytes),
};
let notif = jsonrpc_fdpass::JsonRpcNotification::new(
"stream.segment".to_string(),
Some(serde_json::to_value(¶ms).unwrap()),
);
let message = MessageWithFds::new(JsonRpcMessage::Notification(notif), vec![]);
sender.send(message).await.map_err(|e| {
(
error_codes::INTERNAL_ERROR,
format!("failed to send segment: {}", e),
)
})?;
items_sent += 1;
}
TarSplitItem::FileContent { fd, size, name } => {
let params = StreamFileNotification { name, size };
let notif = jsonrpc_fdpass::JsonRpcNotification::new(
"stream.file".to_string(),
Some(serde_json::to_value(¶ms).unwrap()),
);
let message = MessageWithFds::new(JsonRpcMessage::Notification(notif), vec![fd]);
sender.send(message).await.map_err(|e| {
(
error_codes::INTERNAL_ERROR,
format!("failed to send file: {}", e),
)
})?;
items_sent += 1;
}
}
}
let result = StreamLayerResult { items_sent };
let response =
JsonRpcResponse::success(serde_json::to_value(result).unwrap(), request.id.clone());
let message = MessageWithFds::new(JsonRpcMessage::Response(response), vec![]);
sender.send(message).await.map_err(|e| {
(
error_codes::INTERNAL_ERROR,
format!("failed to send response: {}", e),
)
})?;
Ok(())
}
fn handle_request(
request: &JsonRpcRequest,
) -> (
std::result::Result<serde_json::Value, (i32, String)>,
Vec<OwnedFd>,
) {
match request.method.as_str() {
methods::OPEN_FILE => {
let params: OpenFileParams = match request
.params
.as_ref()
.and_then(|p| serde_json::from_value(p.clone()).ok())
{
Some(p) => p,
None => {
return (
Err((
error_codes::INVALID_PARAMS,
"invalid params: missing 'path' field".to_string(),
)),
vec![],
);
}
};
match std::fs::File::open(¶ms.path) {
Ok(file) => {
let fd: OwnedFd = file.into();
let result = OpenFileResult { success: true };
(Ok(serde_json::to_value(result).unwrap()), vec![fd])
}
Err(e) => (
Err((
error_codes::INTERNAL_ERROR,
format!("failed to open file: {}", e),
)),
vec![],
),
}
}
methods::LIST_IMAGES => handle_list_images(request),
methods::GET_IMAGE => handle_get_image(request),
methods::SHUTDOWN => {
(Ok(serde_json::json!({"success": true})), vec![])
}
_ => (
Err((
error_codes::METHOD_NOT_FOUND,
format!("method not found: {}", request.method),
)),
vec![],
),
}
}
fn handle_list_images(
request: &JsonRpcRequest,
) -> (
std::result::Result<serde_json::Value, (i32, String)>,
Vec<OwnedFd>,
) {
let params: ListImagesParams = match request
.params
.as_ref()
.and_then(|p| serde_json::from_value(p.clone()).ok())
{
Some(p) => p,
None => {
return (
Err((
error_codes::INVALID_PARAMS,
"invalid params for listImages".to_string(),
)),
vec![],
);
}
};
let storage = match Storage::open(¶ms.storage_path) {
Ok(s) => s,
Err(e) => {
return (
Err((
error_codes::INTERNAL_ERROR,
format!("failed to open storage: {}", e),
)),
vec![],
);
}
};
let images = match storage.list_images() {
Ok(imgs) => imgs,
Err(e) => {
return (
Err((
error_codes::INTERNAL_ERROR,
format!("failed to list images: {}", e),
)),
vec![],
);
}
};
let image_infos: Vec<ImageInfo> = images
.iter()
.map(|img| ImageInfo {
id: img.id().to_string(),
names: img.names(&storage).unwrap_or_default(),
})
.collect();
let result = ListImagesResult {
images: image_infos,
};
(Ok(serde_json::to_value(result).unwrap()), vec![])
}
fn handle_get_image(
request: &JsonRpcRequest,
) -> (
std::result::Result<serde_json::Value, (i32, String)>,
Vec<OwnedFd>,
) {
let params: GetImageParams = match request
.params
.as_ref()
.and_then(|p| serde_json::from_value(p.clone()).ok())
{
Some(p) => p,
None => {
return (
Err((
error_codes::INVALID_PARAMS,
"invalid params for getImage".to_string(),
)),
vec![],
);
}
};
let storage = match Storage::open(¶ms.storage_path) {
Ok(s) => s,
Err(e) => {
return (
Err((
error_codes::INTERNAL_ERROR,
format!("failed to open storage: {}", e),
)),
vec![],
);
}
};
let image = match crate::image::Image::open(&storage, ¶ms.image_ref) {
Ok(img) => img,
Err(_) => match storage.find_image_by_name(¶ms.image_ref) {
Ok(img) => img,
Err(e) => {
return (
Err((
error_codes::RESOURCE_NOT_FOUND,
format!("image not found: {}", e),
)),
vec![],
);
}
},
};
let config = match image.config() {
Ok(cfg) => cfg,
Err(e) => {
return (
Err((
error_codes::INTERNAL_ERROR,
format!("failed to read config: {}", e),
)),
vec![],
);
}
};
let diff_ids: Vec<oci_spec::image::Digest> = config
.rootfs()
.diff_ids()
.iter()
.map(|s| s.parse().expect("config diff_id should be valid digest"))
.collect();
let storage_layer_ids = match image.storage_layer_ids(std::slice::from_ref(&storage)) {
Ok(ids) => ids,
Err(e) => {
return (
Err((
error_codes::INTERNAL_ERROR,
format!("failed to get storage layer IDs: {}", e),
)),
vec![],
);
}
};
let result = GetImageResult {
id: image.id().to_string(),
names: image.names(&storage).unwrap_or_default(),
layer_diff_ids: diff_ids,
storage_layer_ids,
};
(Ok(serde_json::to_value(result).unwrap()), vec![])
}
pub struct StorageProxy {
child: Child,
sender: jsonrpc_fdpass::transport::Sender,
receiver: jsonrpc_fdpass::transport::Receiver,
next_id: u64,
}
impl std::fmt::Debug for StorageProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageProxy")
.field("child_pid", &self.child.id())
.finish_non_exhaustive()
}
}
impl StorageProxy {
pub async fn spawn() -> std::result::Result<Option<Self>, HelperError> {
if can_bypass_file_permissions() {
return Ok(None);
}
Self::spawn_helper().await.map(Some)
}
async fn spawn_helper() -> std::result::Result<Self, HelperError> {
let exe = std::fs::read_link("/proc/self/exe").map_err(HelperError::Io)?;
Self::spawn_helper_with_binary(exe).await
}
async fn spawn_helper_with_binary(
exe: std::path::PathBuf,
) -> std::result::Result<Self, HelperError> {
let (parent_sock, child_sock) = StdUnixStream::pair().map_err(HelperError::Socket)?;
let child = Command::new("podman")
.arg("unshare")
.arg("env")
.arg(format!("{}=1", HELPER_ENV))
.arg(&exe)
.stdin(Stdio::from(OwnedFd::from(child_sock)))
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.map_err(HelperError::Spawn)?;
parent_sock.set_nonblocking(true)?;
let tokio_socket = TokioUnixStream::from_std(parent_sock)
.map_err(|e| HelperError::Ipc(format!("failed to convert socket: {}", e)))?;
let transport = UnixSocketTransport::new(tokio_socket);
let (sender, receiver) = transport.split();
Ok(Self {
child,
sender,
receiver,
next_id: 1,
})
}
pub async fn open_file(
&mut self,
path: impl AsRef<Path>,
) -> std::result::Result<OwnedFd, HelperError> {
let params = OpenFileParams {
path: path.as_ref().to_string_lossy().to_string(),
};
let id = self.next_id;
self.next_id += 1;
let request = JsonRpcRequest::new(
methods::OPEN_FILE.to_string(),
Some(serde_json::to_value(¶ms).unwrap()),
serde_json::Value::Number(id.into()),
);
let message = MessageWithFds::new(JsonRpcMessage::Request(request), vec![]);
self.sender
.send(message)
.await
.map_err(|e| HelperError::Ipc(format!("failed to send request: {}", e)))?;
let response = self
.receiver
.receive()
.await
.map_err(|e| HelperError::Ipc(format!("failed to receive response: {}", e)))?;
match response.message {
JsonRpcMessage::Response(resp) => {
if let Some(error) = resp.error {
return Err(HelperError::RpcError {
code: error.code(),
message: error.message().to_string(),
});
}
if response.file_descriptors.is_empty() {
return Err(HelperError::Ipc(
"response missing file descriptor".to_string(),
));
}
Ok(response.file_descriptors.into_iter().next().unwrap())
}
other => Err(HelperError::Ipc(format!(
"unexpected message type: {:?}",
other
))),
}
}
pub async fn shutdown(mut self) -> std::result::Result<(), HelperError> {
let id = self.next_id;
let request = JsonRpcRequest::new(
methods::SHUTDOWN.to_string(),
None,
serde_json::Value::Number(id.into()),
);
let message = MessageWithFds::new(JsonRpcMessage::Request(request), vec![]);
let _ = self.sender.send(message).await;
let _ = self.child.wait();
Ok(())
}
pub async fn list_images(
&mut self,
storage_path: &str,
) -> std::result::Result<Vec<ImageInfo>, HelperError> {
let params = ListImagesParams {
storage_path: storage_path.to_string(),
};
let result: ListImagesResult = self.call(methods::LIST_IMAGES, ¶ms).await?;
Ok(result.images)
}
pub async fn get_image(
&mut self,
storage_path: &str,
image_ref: &str,
) -> std::result::Result<GetImageResult, HelperError> {
let params = GetImageParams {
storage_path: storage_path.to_string(),
image_ref: image_ref.to_string(),
};
self.call(methods::GET_IMAGE, ¶ms).await
}
pub async fn stream_layer(
&mut self,
storage_path: &str,
layer_id: &str,
) -> std::result::Result<ProxiedLayerStream<'_>, HelperError> {
let params = StreamLayerParams {
storage_path: storage_path.to_string(),
layer_id: layer_id.to_string(),
};
let id = self.next_id;
self.next_id += 1;
let request = JsonRpcRequest::new(
methods::STREAM_LAYER.to_string(),
Some(serde_json::to_value(¶ms).unwrap()),
serde_json::Value::Number(id.into()),
);
let message = MessageWithFds::new(JsonRpcMessage::Request(request), vec![]);
self.sender
.send(message)
.await
.map_err(|e| HelperError::Ipc(format!("failed to send stream_layer request: {}", e)))?;
Ok(ProxiedLayerStream {
receiver: &mut self.receiver,
request_id: id,
finished: false,
})
}
async fn call<P: Serialize, R: for<'de> Deserialize<'de>>(
&mut self,
method: &str,
params: &P,
) -> std::result::Result<R, HelperError> {
let id = self.next_id;
self.next_id += 1;
let request = JsonRpcRequest::new(
method.to_string(),
Some(serde_json::to_value(params).unwrap()),
serde_json::Value::Number(id.into()),
);
let message = MessageWithFds::new(JsonRpcMessage::Request(request), vec![]);
self.sender
.send(message)
.await
.map_err(|e| HelperError::Ipc(format!("failed to send request: {}", e)))?;
let response = self
.receiver
.receive()
.await
.map_err(|e| HelperError::Ipc(format!("failed to receive response: {}", e)))?;
match response.message {
JsonRpcMessage::Response(resp) => {
if let Some(error) = resp.error {
return Err(HelperError::RpcError {
code: error.code(),
message: error.message().to_string(),
});
}
let result = resp
.result
.ok_or_else(|| HelperError::Ipc("response missing result".to_string()))?;
serde_json::from_value(result)
.map_err(|e| HelperError::Ipc(format!("failed to parse result: {}", e)))
}
other => Err(HelperError::Ipc(format!(
"unexpected message type: {:?}",
other
))),
}
}
}
#[derive(Debug)]
pub enum ProxiedTarSplitItem {
Segment(Vec<u8>),
FileContent {
fd: OwnedFd,
size: u64,
name: String,
},
}
pub struct ProxiedLayerStream<'a> {
receiver: &'a mut jsonrpc_fdpass::transport::Receiver,
request_id: u64,
finished: bool,
}
impl std::fmt::Debug for ProxiedLayerStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProxiedLayerStream")
.field("request_id", &self.request_id)
.field("finished", &self.finished)
.finish_non_exhaustive()
}
}
impl<'a> ProxiedLayerStream<'a> {
pub async fn next(&mut self) -> std::result::Result<Option<ProxiedTarSplitItem>, HelperError> {
if self.finished {
return Ok(None);
}
let msg_with_fds = match self.receiver.receive().await {
Ok(m) => m,
Err(jsonrpc_fdpass::Error::ConnectionClosed) => {
self.finished = true;
return Ok(None);
}
Err(e) => {
return Err(HelperError::Ipc(format!("failed to receive: {}", e)));
}
};
let mut fds = msg_with_fds.file_descriptors;
match msg_with_fds.message {
JsonRpcMessage::Notification(notif) => {
let params = notif.params.unwrap_or(serde_json::Value::Null);
match notif.method.as_str() {
"stream.segment" => {
let seg: StreamSegmentNotification = serde_json::from_value(params)
.map_err(|e| {
HelperError::Ipc(format!("invalid segment params: {}", e))
})?;
let bytes = BASE64_STANDARD.decode(&seg.data).map_err(|e| {
HelperError::Ipc(format!("failed to decode segment: {}", e))
})?;
Ok(Some(ProxiedTarSplitItem::Segment(bytes)))
}
"stream.file" => {
let file: StreamFileNotification = serde_json::from_value(params)
.map_err(|e| HelperError::Ipc(format!("invalid file params: {}", e)))?;
if fds.is_empty() {
return Err(HelperError::Ipc(
"file notification missing fd".to_string(),
));
}
let fd = fds.remove(0);
Ok(Some(ProxiedTarSplitItem::FileContent {
fd,
size: file.size,
name: file.name,
}))
}
other => Err(HelperError::Ipc(format!(
"unknown notification method: {}",
other
))),
}
}
JsonRpcMessage::Response(resp) => {
self.finished = true;
if let Some(error) = resp.error {
return Err(HelperError::RpcError {
code: error.code(),
message: error.message().to_string(),
});
}
Ok(None)
}
JsonRpcMessage::Request(_) => Err(HelperError::Ipc(
"unexpected request from helper".to_string(),
)),
}
}
}
impl Drop for StorageProxy {
fn drop(&mut self) {
let _ = self.child.kill();
}
}