use std::sync::Arc;
use async_trait::async_trait;
use crate::client::MasterClient;
use crate::config::{GooseFsConfig, WriteType};
use crate::context::FileSystemContext;
use crate::error::{Error, Result};
use crate::fs::filesystem::FileSystem;
use crate::fs::options::{CreateFileOptions, DeleteOptions, OpenFileOptions};
use crate::fs::uri_status::URIStatus;
use crate::fs::write_type::{get_write_type_from_xattr, WriteTypeXAttr};
use crate::io::{GooseFsFileInStream, GooseFsFileWriter};
use crate::proto::grpc::file::{CreateFilePOptions, WritePType};
pub struct BaseFileSystem {
ctx: Arc<FileSystemContext>,
config: GooseFsConfig,
}
impl BaseFileSystem {
pub fn from_context(ctx: Arc<FileSystemContext>) -> Arc<Self> {
let config = ctx.config().clone();
Arc::new(Self { config, ctx })
}
pub async fn connect(config: GooseFsConfig) -> Result<Arc<Self>> {
let ctx = FileSystemContext::connect(config).await?;
Ok(Self::from_context(ctx))
}
pub fn config(&self) -> &GooseFsConfig {
&self.config
}
fn master(&self) -> Arc<MasterClient> {
self.ctx.acquire_master()
}
async fn resolve_write_type(&self, path: &str, options: &CreateFileOptions) -> WriteType {
if let WriteTypeXAttr::Explicit(wt) = options.write_type {
return wt;
}
let parent = Self::parent_path(path);
if let Some(parent_path) = parent {
let master = self.master();
if let Ok(parent_info) = master.get_status(&parent_path).await {
let parent_status = URIStatus::from_proto(parent_info);
if let Some(wt) = get_write_type_from_xattr(&parent_status.xattr) {
return wt;
}
}
}
if let Some(proto_wt) = self.config.get_write_type() {
if let Ok(wt) = WriteType::try_from_proto(proto_wt) {
return wt;
}
}
WriteType::MustCache
}
fn parent_path(path: &str) -> Option<String> {
let trimmed = path.trim_end_matches('/');
if trimmed.is_empty() {
return None;
}
let last_slash = trimmed.rfind('/')?;
if last_slash == 0 {
Some("/".to_string())
} else {
Some(trimmed[..last_slash].to_string())
}
}
}
#[async_trait]
impl FileSystem for BaseFileSystem {
async fn get_status(&self, path: &str) -> Result<URIStatus> {
let master = self.master();
let fi = master.get_status(path).await?;
Ok(URIStatus::from_proto(fi))
}
async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<URIStatus>> {
let master = self.master();
let items = master.list_status(path, recursive).await?;
Ok(items.into_iter().map(URIStatus::from_proto).collect())
}
async fn exists(&self, path: &str) -> Result<bool> {
match self.get_status(path).await {
Ok(status) => {
Ok(status.is_readable())
}
Err(Error::NotFound { .. }) => Ok(false),
Err(e) => Err(e),
}
}
async fn open_file(&self, path: &str, options: OpenFileOptions) -> Result<GooseFsFileInStream> {
GooseFsFileInStream::open_with_context(self.ctx.clone(), path, options).await
}
async fn create_file(
&self,
path: &str,
options: CreateFileOptions,
) -> Result<GooseFsFileWriter> {
let write_type = self.resolve_write_type(path, &options).await;
let proto_opts = CreateFilePOptions {
block_size_bytes: options.block_size_bytes,
recursive: Some(options.recursive),
write_type: Some(WritePType::from(write_type) as i32),
..Default::default()
};
GooseFsFileWriter::create_with_context(self.ctx.clone(), path, Some(proto_opts)).await
}
async fn mkdir(&self, path: &str, recursive: bool) -> Result<()> {
let master = self.master();
master.create_directory(path, recursive).await
}
async fn delete(&self, path: &str, options: DeleteOptions) -> Result<()> {
let master = self.master();
master.delete_with_options(path, options).await
}
async fn rename(&self, src: &str, dst: &str) -> Result<()> {
let master = self.master();
master.rename(src, dst).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parent_path_normal() {
assert_eq!(
BaseFileSystem::parent_path("/data/hello.txt"),
Some("/data".to_string())
);
}
#[test]
fn test_parent_path_root_child() {
assert_eq!(
BaseFileSystem::parent_path("/hello.txt"),
Some("/".to_string())
);
}
#[test]
fn test_parent_path_root() {
assert_eq!(BaseFileSystem::parent_path("/"), None);
}
#[test]
fn test_parent_path_nested() {
assert_eq!(
BaseFileSystem::parent_path("/a/b/c/file.parquet"),
Some("/a/b/c".to_string())
);
}
#[test]
fn test_parent_path_trailing_slash() {
assert_eq!(
BaseFileSystem::parent_path("/data/dir/"),
Some("/data".to_string())
);
}
#[test]
fn test_from_context_sets_ctx() {
}
}