use std::{path::Path, sync::Arc};
use bytes::Bytes;
use microsandbox_protocol::{
fs::{FsData, FsEntryInfo, FsOpenOptions, FsResponse},
message::{Message, MessageType},
};
use tokio::sync::mpsc;
use crate::{
MicrosandboxError, MicrosandboxResult,
agent::AgentClient,
backend::{Backend, LocalBackend},
};
pub struct SandboxFs<'a> {
backend: Arc<dyn Backend>,
name: &'a str,
}
pub type FsHandle = u64;
#[derive(Debug, Clone)]
pub struct FsEntry {
pub path: String,
pub kind: FsEntryKind,
pub size: u64,
pub mode: u32,
pub modified: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FsEntryKind {
File,
Directory,
Symlink,
Other,
}
#[derive(Debug, Clone)]
pub struct FsMetadata {
pub kind: FsEntryKind,
pub size: u64,
pub mode: u32,
pub readonly: bool,
pub modified: Option<chrono::DateTime<chrono::Utc>>,
pub created: Option<chrono::DateTime<chrono::Utc>>,
}
pub struct FsReadStream {
rx: mpsc::Receiver<Message>,
_client: Option<Arc<AgentClient>>,
}
pub struct FsWriteSink {
id: u32,
client: Arc<AgentClient>,
rx: mpsc::Receiver<Message>,
close_handle: Option<FsHandle>,
}
impl<'a> SandboxFs<'a> {
pub(crate) fn new(backend: Arc<dyn Backend>, name: &'a str) -> Self {
Self { backend, name }
}
pub fn with_backend(backend: Arc<dyn Backend>, name: &'a str) -> Self {
Self { backend, name }
}
pub async fn read(&self, path: &str) -> MicrosandboxResult<Bytes> {
self.backend
.sandboxes()
.fs_read(self.backend.clone(), self.name, path)
.await
}
pub async fn read_to_string(&self, path: &str) -> MicrosandboxResult<String> {
let data = self.read(path).await?;
String::from_utf8(Vec::from(data))
.map_err(|e| MicrosandboxError::SandboxFsOps(format!("invalid utf-8: {e}")))
}
pub async fn read_stream(&self, path: &str) -> MicrosandboxResult<FsReadStream> {
self.backend
.sandboxes()
.fs_read_stream(self.backend.clone(), self.name, path)
.await
}
pub async fn write(&self, path: &str, data: impl AsRef<[u8]>) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_write(
self.backend.clone(),
self.name,
path,
data.as_ref().to_vec(),
)
.await
}
pub async fn write_stream(&self, path: &str) -> MicrosandboxResult<FsWriteSink> {
self.backend
.sandboxes()
.fs_write_stream(self.backend.clone(), self.name, path)
.await
}
pub async fn list(&self, path: &str) -> MicrosandboxResult<Vec<FsEntry>> {
self.backend
.sandboxes()
.fs_list(self.backend.clone(), self.name, path)
.await
}
pub async fn mkdir(&self, path: &str) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_mkdir(self.backend.clone(), self.name, path)
.await
}
pub async fn remove_dir(&self, path: &str) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_remove(self.backend.clone(), self.name, path, true)
.await
}
pub async fn remove(&self, path: &str) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_remove(self.backend.clone(), self.name, path, false)
.await
}
pub async fn copy(&self, from: &str, to: &str) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_copy(self.backend.clone(), self.name, from, to)
.await
}
pub async fn rename(&self, from: &str, to: &str) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_rename(self.backend.clone(), self.name, from, to)
.await
}
pub async fn stat(&self, path: &str) -> MicrosandboxResult<FsMetadata> {
self.backend
.sandboxes()
.fs_stat(self.backend.clone(), self.name, path)
.await
}
pub async fn stat_with_follow(
&self,
path: &str,
follow_symlink: bool,
) -> MicrosandboxResult<FsMetadata> {
let local = self.local_backend("SandboxFs::stat_with_follow")?;
local::stat_with_follow(local, self.name, path, follow_symlink).await
}
pub async fn set_stat(
&self,
path: &str,
follow_symlink: bool,
attrs: FsSetAttrs,
) -> MicrosandboxResult<()> {
let local = self.local_backend("SandboxFs::set_stat")?;
local::set_stat(local, self.name, path, follow_symlink, attrs).await
}
pub async fn read_link(&self, path: &str) -> MicrosandboxResult<String> {
let local = self.local_backend("SandboxFs::read_link")?;
local::read_link(local, self.name, path).await
}
pub async fn symlink(&self, target: &str, link_path: &str) -> MicrosandboxResult<()> {
let local = self.local_backend("SandboxFs::symlink")?;
local::symlink(local, self.name, target, link_path).await
}
pub async fn exists(&self, path: &str) -> MicrosandboxResult<bool> {
self.backend
.sandboxes()
.fs_exists(self.backend.clone(), self.name, path)
.await
}
pub async fn copy_from_host(
&self,
host_path: impl AsRef<Path>,
guest_path: &str,
) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_copy_from_host(
self.backend.clone(),
self.name,
host_path.as_ref(),
guest_path,
)
.await
}
pub async fn copy_to_host(
&self,
guest_path: &str,
host_path: impl AsRef<Path>,
) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.fs_copy_to_host(
self.backend.clone(),
self.name,
guest_path,
host_path.as_ref(),
)
.await
}
fn local_backend(&self, method: &'static str) -> MicrosandboxResult<&LocalBackend> {
self.backend
.as_local()
.ok_or_else(|| MicrosandboxError::Unsupported {
feature: method.into(),
available_when: "when cloud guest-fs lands".into(),
})
}
}
impl FsReadStream {
pub(crate) fn with_client(rx: mpsc::Receiver<Message>, client: Arc<AgentClient>) -> Self {
Self {
rx,
_client: Some(client),
}
}
pub async fn recv(&mut self) -> MicrosandboxResult<Option<Bytes>> {
while let Some(msg) = self.rx.recv().await {
match msg.t {
MessageType::FsData => {
let chunk: FsData = msg.payload()?;
if !chunk.data.is_empty() {
return Ok(Some(Bytes::from(chunk.data)));
}
}
MessageType::FsResponse => {
let resp: FsResponse = msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
return Ok(None);
}
_ => {}
}
}
Ok(None)
}
pub async fn collect(mut self) -> MicrosandboxResult<Bytes> {
let mut data = Vec::new();
while let Some(chunk) = self.recv().await? {
data.extend_from_slice(&chunk);
}
Ok(Bytes::from(data))
}
}
impl FsWriteSink {
pub(crate) fn new(
id: u32,
client: Arc<AgentClient>,
rx: mpsc::Receiver<Message>,
close_handle: Option<FsHandle>,
) -> Self {
Self {
id,
client,
rx,
close_handle,
}
}
pub async fn write(&self, data: impl AsRef<[u8]>) -> MicrosandboxResult<()> {
let fs_data = FsData {
data: data.as_ref().to_vec(),
};
self.client
.send(self.id, MessageType::FsData, &fs_data)
.await
.map_err(Into::into)
}
pub async fn close(mut self) -> MicrosandboxResult<()> {
let eof = FsData { data: Vec::new() };
self.client.send(self.id, MessageType::FsData, &eof).await?;
let result = wait_for_ok_response(&mut self.rx).await;
if let Some(handle) = self.close_handle.take() {
let _ = local::close_handle(&self.client, handle).await;
}
result
}
}
fn parse_kind(s: &str) -> FsEntryKind {
match s {
"file" => FsEntryKind::File,
"dir" => FsEntryKind::Directory,
"symlink" => FsEntryKind::Symlink,
_ => FsEntryKind::Other,
}
}
fn parse_modified(ts: Option<i64>) -> Option<chrono::DateTime<chrono::Utc>> {
ts.map(|t| chrono::DateTime::from_timestamp(t, 0).unwrap_or_default())
}
fn entry_info_to_fs_entry(info: FsEntryInfo) -> FsEntry {
FsEntry {
kind: parse_kind(&info.kind),
modified: parse_modified(info.modified),
path: info.path,
size: info.size,
mode: info.mode,
}
}
fn entry_info_to_metadata(info: &FsEntryInfo) -> FsMetadata {
FsMetadata {
kind: parse_kind(&info.kind),
modified: parse_modified(info.modified),
created: None,
size: info.size,
mode: info.mode,
readonly: info.mode & 0o200 == 0,
}
}
fn check_response(msg: Message) -> MicrosandboxResult<()> {
let resp: FsResponse = msg.payload()?;
if resp.ok {
Ok(())
} else {
Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
))
}
}
async fn wait_for_ok_response(rx: &mut mpsc::Receiver<Message>) -> MicrosandboxResult<()> {
while let Some(msg) = rx.recv().await {
if msg.t == MessageType::FsResponse {
return check_response(msg);
}
}
Err(MicrosandboxError::SandboxFsOps(
"channel closed before response".into(),
))
}
fn read_only_open_options() -> FsOpenOptions {
FsOpenOptions {
read: true,
..Default::default()
}
}
fn write_open_options() -> FsOpenOptions {
FsOpenOptions {
write: true,
create: true,
truncate: true,
..Default::default()
}
}
pub(crate) mod local {
use std::path::Path;
use std::sync::Arc;
use bytes::Bytes;
use microsandbox_protocol::{
fs::{
FS_CHUNK_SIZE, FsData, FsOp, FsOpenOptions, FsRequest, FsResponse, FsResponseData,
FsSetAttrs,
},
message::MessageType,
};
use tokio::io::AsyncReadExt;
use crate::{MicrosandboxError, MicrosandboxResult, agent::AgentClient, backend::LocalBackend};
use super::{
FsEntry, FsHandle, FsMetadata, FsReadStream, FsWriteSink, check_response,
entry_info_to_fs_entry, entry_info_to_metadata, wait_for_ok_response,
};
pub(crate) async fn connect_agent(
local: &LocalBackend,
name: &str,
) -> MicrosandboxResult<AgentClient> {
connect_agent_with_timeout(local, name, std::time::Duration::from_secs(10)).await
}
pub(crate) async fn connect_agent_with_timeout(
local: &LocalBackend,
name: &str,
timeout: std::time::Duration,
) -> MicrosandboxResult<AgentClient> {
let mut last_error = None;
for sock_path in crate::runtime::sandbox_agent_socket_path_candidates_for(local, name) {
if !agent_endpoint_may_exist(&sock_path) {
continue;
}
match AgentClient::connect_with_timeout(&sock_path, timeout).await {
Ok(client) => return Ok(client),
Err(error) => last_error = Some(error),
}
}
match last_error {
Some(error) => Err(error.into()),
None => Err(MicrosandboxError::Runtime(format!(
"no agent endpoint found for sandbox {name:?}"
))),
}
}
#[cfg(unix)]
fn agent_endpoint_may_exist(path: &std::path::Path) -> bool {
path.exists()
}
#[cfg(windows)]
fn agent_endpoint_may_exist(_path: &std::path::Path) -> bool {
true
}
async fn open_file(
client: &AgentClient,
path: &str,
options: FsOpenOptions,
) -> MicrosandboxResult<FsHandle> {
let req = FsRequest {
op: FsOp::OpenFile {
path: path.to_string(),
options,
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
let resp: FsResponse = resp_msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
match resp.data {
Some(FsResponseData::Handle(handle)) => Ok(handle),
_ => Err(MicrosandboxError::SandboxFsOps(
"unexpected response data for open".into(),
)),
}
}
pub(crate) async fn close_handle(
client: &AgentClient,
handle: FsHandle,
) -> MicrosandboxResult<()> {
let req = FsRequest {
op: FsOp::CloseHandle { handle },
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn read(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<Bytes> {
let client = connect_agent(local, name).await?;
let handle = open_file(&client, path, super::read_only_open_options()).await?;
let req = FsRequest {
op: FsOp::Read {
handle,
offset: 0,
len: None,
},
};
let (_id, mut rx) = client.stream(MessageType::FsRequest, &req).await?;
let mut data = Vec::new();
while let Some(msg) = rx.recv().await {
match msg.t {
MessageType::FsData => {
let chunk: FsData = msg.payload()?;
data.extend_from_slice(&chunk.data);
}
MessageType::FsResponse => {
let resp: FsResponse = msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
break;
}
_ => {}
}
}
let close_result = close_handle(&client, handle).await;
close_result?;
Ok(Bytes::from(data))
}
pub(crate) async fn read_stream(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<FsReadStream> {
let client = Arc::new(connect_agent(local, name).await?);
let handle = open_file(&client, path, super::read_only_open_options()).await?;
let req = FsRequest {
op: FsOp::Read {
handle,
offset: 0,
len: None,
},
};
let (_id, rx) = client.stream(MessageType::FsRequest, &req).await?;
Ok(FsReadStream::with_client(rx, client))
}
pub(crate) async fn write(
local: &LocalBackend,
name: &str,
path: &str,
data: Vec<u8>,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let handle = open_file(&client, path, super::write_open_options()).await?;
let req = FsRequest {
op: FsOp::Write {
handle,
offset: 0,
len: Some(data.len() as u64),
},
};
let (id, mut rx) = client.stream(MessageType::FsRequest, &req).await?;
for chunk in data.chunks(FS_CHUNK_SIZE) {
let fs_data = FsData {
data: chunk.to_vec(),
};
client.send(id, MessageType::FsData, &fs_data).await?;
}
let eof = FsData { data: Vec::new() };
client.send(id, MessageType::FsData, &eof).await?;
let result = wait_for_ok_response(&mut rx).await;
let _ = close_handle(&client, handle).await;
result
}
pub(crate) async fn write_stream(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<FsWriteSink> {
let client = Arc::new(connect_agent(local, name).await?);
let handle = open_file(&client, path, super::write_open_options()).await?;
let req = FsRequest {
op: FsOp::Write {
handle,
offset: 0,
len: None,
},
};
let (id, rx) = client.stream(MessageType::FsRequest, &req).await?;
Ok(FsWriteSink::new(id, client, rx, Some(handle)))
}
pub(crate) async fn list(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<Vec<FsEntry>> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::List {
path: path.to_string(),
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
let resp: FsResponse = resp_msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
match resp.data {
Some(FsResponseData::List(entries)) => {
Ok(entries.into_iter().map(entry_info_to_fs_entry).collect())
}
_ => Ok(Vec::new()),
}
}
pub(crate) async fn mkdir(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::Mkdir {
path: path.to_string(),
mode: None,
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn remove(
local: &LocalBackend,
name: &str,
path: &str,
recursive: bool,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let op = if recursive {
FsOp::RemoveDir {
path: path.to_string(),
recursive,
}
} else {
FsOp::Remove {
path: path.to_string(),
}
};
let req = FsRequest { op };
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn copy(
local: &LocalBackend,
name: &str,
from: &str,
to: &str,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::Copy {
src: from.to_string(),
dst: to.to_string(),
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn rename(
local: &LocalBackend,
name: &str,
from: &str,
to: &str,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::Rename {
src: from.to_string(),
dst: to.to_string(),
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn stat(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<FsMetadata> {
stat_with_follow(local, name, path, true).await
}
pub(crate) async fn stat_with_follow(
local: &LocalBackend,
name: &str,
path: &str,
follow_symlink: bool,
) -> MicrosandboxResult<FsMetadata> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::Stat {
path: path.to_string(),
follow_symlink,
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
let resp: FsResponse = resp_msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
match resp.data {
Some(FsResponseData::Stat(info)) => Ok(entry_info_to_metadata(&info)),
_ => Err(MicrosandboxError::SandboxFsOps(
"unexpected response data for stat".into(),
)),
}
}
pub(crate) async fn set_stat(
local: &LocalBackend,
name: &str,
path: &str,
follow_symlink: bool,
attrs: FsSetAttrs,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::SetStat {
path: path.to_string(),
follow_symlink,
attrs,
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn read_link(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<String> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::ReadLink {
path: path.to_string(),
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
let resp: FsResponse = resp_msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFsOps(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
match resp.data {
Some(FsResponseData::Path(path)) => Ok(path),
_ => Err(MicrosandboxError::SandboxFsOps(
"unexpected response data for readlink".into(),
)),
}
}
pub(crate) async fn symlink(
local: &LocalBackend,
name: &str,
target: &str,
link_path: &str,
) -> MicrosandboxResult<()> {
let client = connect_agent(local, name).await?;
let req = FsRequest {
op: FsOp::Symlink {
target: target.to_string(),
link_path: link_path.to_string(),
},
};
let resp_msg = client.request(MessageType::FsRequest, &req).await?;
check_response(resp_msg)
}
pub(crate) async fn exists(
local: &LocalBackend,
name: &str,
path: &str,
) -> MicrosandboxResult<bool> {
match stat(local, name, path).await {
Ok(_) => Ok(true),
Err(MicrosandboxError::SandboxFsOps(_)) => Ok(false),
Err(e) => Err(e),
}
}
pub(crate) async fn copy_from_host(
local: &LocalBackend,
name: &str,
host_path: &Path,
guest_path: &str,
) -> MicrosandboxResult<()> {
let mut file = tokio::fs::File::open(host_path).await?;
let sink = write_stream(local, name, guest_path).await?;
let mut buf = vec![0u8; FS_CHUNK_SIZE];
loop {
let n = file.read(&mut buf).await?;
if n == 0 {
break;
}
sink.write(&buf[..n]).await?;
}
sink.close().await
}
pub(crate) async fn copy_to_host(
local: &LocalBackend,
name: &str,
guest_path: &str,
host_path: &Path,
) -> MicrosandboxResult<()> {
let data = read(local, name, guest_path).await?;
tokio::fs::write(host_path, &data).await?;
Ok(())
}
}
pub use microsandbox_protocol::fs::FsSetAttrs;