use serde_json::Value;
use crate::error::{Error, Result};
use crate::process_socket::ProcessSocket;
use crate::transport::DataPlaneClient;
use base64::Engine;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FileType {
File,
Dir,
Symlink,
Other(String),
}
#[derive(Clone, Debug, Default)]
pub struct EntryInfo {
pub name: String,
pub path: String,
pub file_type: Option<FileType>,
pub size: Option<u64>,
pub metadata: serde_json::Map<String, Value>,
}
pub type WriteInfo = EntryInfo;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WriteEntry {
pub path: String,
pub data: Vec<u8>,
}
impl WriteEntry {
pub fn new(path: impl Into<String>, data: impl AsRef<[u8]>) -> Self {
Self {
path: path.into(),
data: data.as_ref().to_vec(),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct FileReadOptions {
pub max_bytes: Option<usize>,
}
#[derive(Clone, Debug, Default)]
pub struct FilesystemEvent {
pub event_type: String,
pub path: String,
pub name: String,
pub entry: Option<EntryInfo>,
pub raw: Value,
}
#[derive(Clone, Debug, Default)]
pub struct WatchOptions {
pub recursive: bool,
pub include_entry: bool,
}
pub struct WatchHandle {
socket: ProcessSocket,
}
impl WatchHandle {
pub async fn next_events(&mut self) -> Result<Option<Vec<FilesystemEvent>>> {
while let Some(frame) = self.socket.next_frame().await? {
if frame.get("type").and_then(Value::as_str) != Some("events") {
continue;
}
let events = frame
.get("events")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
.into_iter()
.map(filesystem_event)
.collect();
return Ok(Some(events));
}
Ok(None)
}
pub async fn stop(&mut self) -> Result<()> {
self.socket.close().await
}
}
#[derive(Clone)]
pub struct Filesystem {
data_plane: DataPlaneClient,
}
impl Filesystem {
pub(crate) fn new(data_plane: DataPlaneClient) -> Self {
Self { data_plane }
}
pub async fn read_text(&self, path: &str) -> Result<String> {
let bytes = self.read_bytes(path).await?;
Ok(String::from_utf8_lossy(&bytes).into_owned())
}
pub async fn read_bytes(&self, path: &str) -> Result<Vec<u8>> {
self.read_bytes_with_options(path, FileReadOptions::default())
.await
}
pub async fn read_bytes_with_options(
&self,
path: &str,
opts: FileReadOptions,
) -> Result<Vec<u8>> {
self.data_plane
.get_bytes_with_limit(
&format!("/runtime/v1/files?path={}", urlencoding(path)),
opts.max_bytes,
)
.await
}
pub async fn write(&self, path: &str, data: impl AsRef<[u8]>) -> Result<WriteInfo> {
let payload = self
.data_plane
.put_bytes(
&format!("/runtime/v1/files?path={}", urlencoding(path)),
data.as_ref().to_vec(),
)
.await?;
Ok(entry_info(payload.get("file").unwrap_or(&payload)))
}
pub async fn write_files(&self, files: Vec<WriteEntry>) -> Result<Vec<WriteInfo>> {
if files.is_empty() {
return Ok(vec![]);
}
let payload = self
.data_plane
.post_json("/runtime/v1/files/write_files", write_files_payload(&files))
.await?;
Ok(payload
.get("files")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
.into_iter()
.map(|value| entry_info(&value))
.collect())
}
pub async fn list(&self, path: &str) -> Result<Vec<EntryInfo>> {
let payload = self
.data_plane
.get_json(&format!(
"/runtime/v1/directories?path={}",
urlencoding(path)
))
.await?;
Ok(payload
.get("entries")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
.into_iter()
.map(|value| entry_info(&value))
.collect())
}
pub async fn exists(&self, path: &str) -> Result<bool> {
match self.get_info(path).await {
Ok(_) => Ok(true),
Err(Error::FileNotFound(_)) | Err(Error::NotFound(_)) => Ok(false),
Err(error) => Err(error),
}
}
pub async fn get_info(&self, path: &str) -> Result<EntryInfo> {
let payload = self
.data_plane
.get_json(&format!(
"/runtime/v1/files/stat?path={}",
urlencoding(path)
))
.await?;
Ok(entry_info(
payload
.get("file")
.or_else(|| payload.get("entry"))
.unwrap_or(&payload),
))
}
pub async fn remove(&self, path: &str) -> Result<()> {
self.data_plane
.delete_json(&format!("/runtime/v1/files?path={}", urlencoding(path)))
.await?;
Ok(())
}
pub async fn rename(&self, old_path: &str, new_path: &str) -> Result<EntryInfo> {
let payload = self
.data_plane
.post_json(
"/runtime/v1/files/move",
serde_json::json!({"from_path": old_path, "to_path": new_path}),
)
.await?;
Ok(entry_info(payload.get("file").unwrap_or(&payload)))
}
pub async fn make_dir(&self, path: &str) -> Result<bool> {
self.data_plane
.post_json(
&format!("/runtime/v1/directories?path={}", urlencoding(path)),
serde_json::json!({}),
)
.await?;
Ok(true)
}
pub async fn watch_dir(&self, path: &str, opts: WatchOptions) -> Result<WatchHandle> {
let query = format!(
"path={}&recursive={}&include_entry={}",
urlencoding(path),
opts.recursive,
opts.include_entry
);
let socket = ProcessSocket::connect(
&self.data_plane.base_url,
&self.data_plane.token,
&format!("/runtime/v1/files/watch?{query}"),
)
.await?;
Ok(WatchHandle { socket })
}
}
fn entry_info(value: &Value) -> EntryInfo {
EntryInfo {
name: value
.get("name")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
path: value
.get("path")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string(),
file_type: value.get("type").and_then(Value::as_str).map(file_type),
size: value
.get("bytes")
.or_else(|| value.get("size"))
.and_then(Value::as_u64),
metadata: value
.get("metadata")
.and_then(Value::as_object)
.cloned()
.unwrap_or_default(),
}
}
fn file_type(value: &str) -> FileType {
match value {
"file" => FileType::File,
"dir" | "directory" => FileType::Dir,
"symlink" => FileType::Symlink,
other => FileType::Other(other.to_string()),
}
}
fn filesystem_event(value: Value) -> FilesystemEvent {
let path = value
.get("path")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let event_type = match value
.get("type")
.and_then(Value::as_str)
.unwrap_or("modify")
{
"delete" => "remove",
"modify" => "write",
other => other,
}
.to_string();
let entry = value.get("file").map(entry_info);
FilesystemEvent {
name: path
.trim_end_matches('/')
.rsplit('/')
.next()
.unwrap_or_default()
.to_string(),
path,
event_type,
entry,
raw: value,
}
}
fn urlencoding(value: &str) -> String {
url::form_urlencoded::byte_serialize(value.as_bytes()).collect()
}
fn write_files_payload(files: &[WriteEntry]) -> Value {
serde_json::json!({
"files": files
.iter()
.map(|file| {
serde_json::json!({
"path": file.path.as_str(),
"data_base64": base64::engine::general_purpose::STANDARD.encode(&file.data),
})
})
.collect::<Vec<_>>()
})
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::{write_files_payload, WriteEntry};
#[test]
fn write_files_payload_uses_snake_case_base64_entries() {
assert_eq!(
write_files_payload(&[
WriteEntry::new("/tmp/a.txt", "abc"),
WriteEntry::new("/tmp/b.bin", [0, 1, 2]),
]),
json!({
"files": [
{"path": "/tmp/a.txt", "data_base64": "YWJj"},
{"path": "/tmp/b.bin", "data_base64": "AAEC"}
]
})
);
}
}