use std::{path::Path, sync::Arc};
use bytes::Bytes;
use microsandbox_protocol::{
fs::{FS_CHUNK_SIZE, FsData, FsEntryInfo, FsOp, FsRequest, FsResponse, FsResponseData},
message::{Message, MessageType},
};
use tokio::sync::mpsc;
use crate::{MicrosandboxError, MicrosandboxResult, agent::AgentClient};
pub struct SandboxFs<'a> {
client: &'a Arc<AgentClient>,
}
#[derive(Debug, Clone)]
pub struct FsEntry {
pub path: String,
pub kind: FsEntryKind,
pub size: u64,
pub mode: u32,
pub modified: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FsEntryKind {
File,
Directory,
Symlink,
Other,
}
#[derive(Debug, Clone)]
pub struct FsMetadata {
pub kind: FsEntryKind,
pub size: u64,
pub mode: u32,
pub readonly: bool,
pub modified: Option<chrono::DateTime<chrono::Utc>>,
pub created: Option<chrono::DateTime<chrono::Utc>>,
}
pub struct FsReadStream {
rx: mpsc::UnboundedReceiver<Message>,
}
pub struct FsWriteSink {
id: u32,
client: Arc<AgentClient>,
rx: mpsc::UnboundedReceiver<Message>,
}
impl<'a> SandboxFs<'a> {
pub fn new(client: &'a Arc<AgentClient>) -> Self {
Self { client }
}
pub async fn read(&self, path: &str) -> MicrosandboxResult<Bytes> {
let id = self.client.next_id();
let mut rx = self.client.subscribe(id).await;
let req = FsRequest {
op: FsOp::Read {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, id, &req)?;
self.client.send(&msg).await?;
let mut data = Vec::new();
while let Some(msg) = rx.recv().await {
match msg.t {
MessageType::FsData => {
let chunk: FsData = msg.payload()?;
data.extend_from_slice(&chunk.data);
}
MessageType::FsResponse => {
let resp: FsResponse = msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFs(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
break;
}
_ => {}
}
}
Ok(Bytes::from(data))
}
pub async fn read_to_string(&self, path: &str) -> MicrosandboxResult<String> {
let data = self.read(path).await?;
String::from_utf8(Vec::from(data))
.map_err(|e| MicrosandboxError::SandboxFs(format!("invalid utf-8: {e}")))
}
pub async fn read_stream(&self, path: &str) -> MicrosandboxResult<FsReadStream> {
let id = self.client.next_id();
let rx = self.client.subscribe(id).await;
let req = FsRequest {
op: FsOp::Read {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, id, &req)?;
self.client.send(&msg).await?;
Ok(FsReadStream { rx })
}
pub async fn write(&self, path: &str, data: impl AsRef<[u8]>) -> MicrosandboxResult<()> {
let data = data.as_ref();
let id = self.client.next_id();
let mut rx = self.client.subscribe(id).await;
let req = FsRequest {
op: FsOp::Write {
path: path.to_string(),
mode: None,
},
};
let msg = Message::with_payload(MessageType::FsRequest, id, &req)?;
self.client.send(&msg).await?;
for chunk in data.chunks(FS_CHUNK_SIZE) {
let fs_data = FsData {
data: chunk.to_vec(),
};
let msg = Message::with_payload(MessageType::FsData, id, &fs_data)?;
self.client.send(&msg).await?;
}
let eof = FsData { data: Vec::new() };
let msg = Message::with_payload(MessageType::FsData, id, &eof)?;
self.client.send(&msg).await?;
wait_for_ok_response(&mut rx).await
}
pub async fn write_stream(&self, path: &str) -> MicrosandboxResult<FsWriteSink> {
let id = self.client.next_id();
let rx = self.client.subscribe(id).await;
let req = FsRequest {
op: FsOp::Write {
path: path.to_string(),
mode: None,
},
};
let msg = Message::with_payload(MessageType::FsRequest, id, &req)?;
self.client.send(&msg).await?;
Ok(FsWriteSink {
id,
client: Arc::clone(self.client),
rx,
})
}
pub async fn list(&self, path: &str) -> MicrosandboxResult<Vec<FsEntry>> {
let req = FsRequest {
op: FsOp::List {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
let resp: FsResponse = resp_msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFs(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
match resp.data {
Some(FsResponseData::List(entries)) => {
Ok(entries.into_iter().map(entry_info_to_fs_entry).collect())
}
_ => Ok(Vec::new()),
}
}
pub async fn mkdir(&self, path: &str) -> MicrosandboxResult<()> {
let req = FsRequest {
op: FsOp::Mkdir {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
check_response(resp_msg)
}
pub async fn remove_dir(&self, path: &str) -> MicrosandboxResult<()> {
let req = FsRequest {
op: FsOp::RemoveDir {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
check_response(resp_msg)
}
pub async fn remove(&self, path: &str) -> MicrosandboxResult<()> {
let req = FsRequest {
op: FsOp::Remove {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
check_response(resp_msg)
}
pub async fn copy(&self, from: &str, to: &str) -> MicrosandboxResult<()> {
let req = FsRequest {
op: FsOp::Copy {
src: from.to_string(),
dst: to.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
check_response(resp_msg)
}
pub async fn rename(&self, from: &str, to: &str) -> MicrosandboxResult<()> {
let req = FsRequest {
op: FsOp::Rename {
src: from.to_string(),
dst: to.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
check_response(resp_msg)
}
pub async fn stat(&self, path: &str) -> MicrosandboxResult<FsMetadata> {
let req = FsRequest {
op: FsOp::Stat {
path: path.to_string(),
},
};
let msg = Message::with_payload(MessageType::FsRequest, 0, &req)?;
let resp_msg = self.client.request(msg).await?;
let resp: FsResponse = resp_msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFs(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
match resp.data {
Some(FsResponseData::Stat(info)) => Ok(entry_info_to_metadata(&info)),
_ => Err(MicrosandboxError::SandboxFs(
"unexpected response data for stat".into(),
)),
}
}
pub async fn exists(&self, path: &str) -> MicrosandboxResult<bool> {
match self.stat(path).await {
Ok(_) => Ok(true),
Err(MicrosandboxError::SandboxFs(_)) => Ok(false),
Err(e) => Err(e),
}
}
pub async fn copy_from_host(
&self,
host_path: impl AsRef<Path>,
guest_path: &str,
) -> MicrosandboxResult<()> {
let data = tokio::fs::read(host_path.as_ref()).await?;
self.write(guest_path, &data).await
}
pub async fn copy_to_host(
&self,
guest_path: &str,
host_path: impl AsRef<Path>,
) -> MicrosandboxResult<()> {
let data = self.read(guest_path).await?;
tokio::fs::write(host_path.as_ref(), &data).await?;
Ok(())
}
}
impl FsReadStream {
pub async fn recv(&mut self) -> MicrosandboxResult<Option<Bytes>> {
while let Some(msg) = self.rx.recv().await {
match msg.t {
MessageType::FsData => {
let chunk: FsData = msg.payload()?;
if !chunk.data.is_empty() {
return Ok(Some(Bytes::from(chunk.data)));
}
}
MessageType::FsResponse => {
let resp: FsResponse = msg.payload()?;
if !resp.ok {
return Err(MicrosandboxError::SandboxFs(
resp.error.unwrap_or_else(|| "unknown error".into()),
));
}
return Ok(None);
}
_ => {}
}
}
Ok(None)
}
pub async fn collect(mut self) -> MicrosandboxResult<Bytes> {
let mut data = Vec::new();
while let Some(chunk) = self.recv().await? {
data.extend_from_slice(&chunk);
}
Ok(Bytes::from(data))
}
}
impl FsWriteSink {
pub async fn write(&self, data: impl AsRef<[u8]>) -> MicrosandboxResult<()> {
let fs_data = FsData {
data: data.as_ref().to_vec(),
};
let msg = Message::with_payload(MessageType::FsData, self.id, &fs_data)?;
self.client.send(&msg).await
}
pub async fn close(mut self) -> MicrosandboxResult<()> {
let eof = FsData { data: Vec::new() };
let msg = Message::with_payload(MessageType::FsData, self.id, &eof)?;
self.client.send(&msg).await?;
wait_for_ok_response(&mut self.rx).await
}
}
fn parse_kind(s: &str) -> FsEntryKind {
match s {
"file" => FsEntryKind::File,
"dir" => FsEntryKind::Directory,
"symlink" => FsEntryKind::Symlink,
_ => FsEntryKind::Other,
}
}
fn parse_modified(ts: Option<i64>) -> Option<chrono::DateTime<chrono::Utc>> {
ts.map(|t| chrono::DateTime::from_timestamp(t, 0).unwrap_or_default())
}
fn entry_info_to_fs_entry(info: FsEntryInfo) -> FsEntry {
FsEntry {
kind: parse_kind(&info.kind),
modified: parse_modified(info.modified),
path: info.path,
size: info.size,
mode: info.mode,
}
}
fn entry_info_to_metadata(info: &FsEntryInfo) -> FsMetadata {
FsMetadata {
kind: parse_kind(&info.kind),
modified: parse_modified(info.modified),
created: None,
size: info.size,
mode: info.mode,
readonly: info.mode & 0o200 == 0,
}
}
fn check_response(msg: Message) -> MicrosandboxResult<()> {
let resp: FsResponse = msg.payload()?;
if resp.ok {
Ok(())
} else {
Err(MicrosandboxError::SandboxFs(
resp.error.unwrap_or_else(|| "unknown error".into()),
))
}
}
async fn wait_for_ok_response(rx: &mut mpsc::UnboundedReceiver<Message>) -> MicrosandboxResult<()> {
while let Some(msg) = rx.recv().await {
if msg.t == MessageType::FsResponse {
return check_response(msg);
}
}
Err(MicrosandboxError::SandboxFs(
"channel closed before response".into(),
))
}