use std::collections::HashMap;
use crate::proto::grpc::file::{FileBlockInfo, FileInfo};
#[derive(Debug, Clone)]
pub struct URIStatus {
pub file_id: i64,
pub name: String,
pub path: String,
pub ufs_path: String,
pub length: i64,
pub block_size_bytes: i64,
pub block_ids: Vec<i64>,
pub creation_time_ms: i64,
pub last_modification_time_ms: i64,
pub last_access_time_ms: i64,
pub completed: bool,
pub folder: bool,
pub cacheable: bool,
pub persisted: bool,
pub mount_point: bool,
pub in_goose_fs_percentage: i32,
pub in_memory_percentage: i32,
pub owner: String,
pub group: String,
pub mode: i32,
pub persistence_state: String,
pub mount_id: i64,
pub ufs_fingerprint: String,
pub xattr: HashMap<String, Vec<u8>>,
pub symlink: Option<String>,
block_infos: HashMap<i64, FileBlockInfo>,
}
impl URIStatus {
pub fn from_proto(fi: FileInfo) -> Self {
let block_infos: HashMap<i64, FileBlockInfo> = fi
.file_block_infos
.into_iter()
.filter_map(|fbi| {
let id = fbi.block_info.as_ref()?.block_id?;
Some((id, fbi))
})
.collect();
Self {
file_id: fi.file_id.unwrap_or(0),
name: fi.name.unwrap_or_default(),
path: fi.path.unwrap_or_default(),
ufs_path: fi.ufs_path.unwrap_or_default(),
length: fi.length.unwrap_or(0),
block_size_bytes: fi.block_size_bytes.unwrap_or(0),
block_ids: fi.block_ids,
creation_time_ms: fi.creation_time_ms.unwrap_or(0),
last_modification_time_ms: fi.last_modification_time_ms.unwrap_or(0),
last_access_time_ms: fi.last_access_time_ms.unwrap_or(0),
completed: fi.completed.unwrap_or(false),
folder: fi.folder.unwrap_or(false),
cacheable: fi.cacheable.unwrap_or(false),
persisted: fi.persisted.unwrap_or(false),
mount_point: fi.mount_point.unwrap_or(false),
in_goose_fs_percentage: fi.in_goose_fs_percentage.unwrap_or(0),
in_memory_percentage: fi.in_memory_percentage.unwrap_or(0),
owner: fi.owner.unwrap_or_default(),
group: fi.group.unwrap_or_default(),
mode: fi.mode.unwrap_or(0),
persistence_state: fi.persistence_state.unwrap_or_default(),
mount_id: fi.mount_id.unwrap_or(0),
ufs_fingerprint: fi.ufs_fingerprint.unwrap_or_default(),
xattr: fi.xattr,
symlink: fi.symlink,
block_infos,
}
}
#[inline]
pub fn is_completed(&self) -> bool {
self.completed
}
#[inline]
pub fn is_folder(&self) -> bool {
self.folder
}
#[inline]
pub fn is_readable(&self) -> bool {
self.folder || self.completed
}
#[inline]
pub fn is_persisted(&self) -> bool {
self.persisted
}
#[inline]
pub fn get_block_info(&self, block_id: i64) -> Option<&FileBlockInfo> {
self.block_infos.get(&block_id)
}
#[inline]
pub fn has_block_infos(&self) -> bool {
!self.block_infos.is_empty()
}
#[inline]
pub fn block_infos(&self) -> &HashMap<i64, FileBlockInfo> {
&self.block_infos
}
pub fn add_block_info(&mut self, fbi: FileBlockInfo) {
if let Some(id) = fbi.block_info.as_ref().and_then(|bi| bi.block_id) {
self.block_infos.insert(id, fbi);
}
}
#[inline]
pub fn block_count(&self) -> usize {
self.block_ids.len()
}
pub fn block_index_for_offset(&self, offset: i64) -> Option<usize> {
if self.block_size_bytes <= 0 || offset >= self.length {
return None;
}
Some((offset / self.block_size_bytes) as usize)
}
pub fn offset_in_block(&self, offset: i64) -> Option<i64> {
if self.block_size_bytes <= 0 || offset >= self.length {
return None;
}
Some(offset % self.block_size_bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proto::grpc::{BlockInfo, WorkerNetAddress};
fn make_file_block_info(block_id: i64, offset: i64) -> FileBlockInfo {
FileBlockInfo {
block_info: Some(BlockInfo {
block_id: Some(block_id),
length: Some(64 * 1024 * 1024),
max_replicas: Some(1),
}),
offset: Some(offset),
ufs_locations: vec![WorkerNetAddress {
host: Some("worker1".to_string()),
rpc_port: Some(9203),
..Default::default()
}],
ufs_string_locations: vec![],
}
}
fn make_file_info() -> FileInfo {
FileInfo {
file_id: Some(42),
name: Some("hello.txt".to_string()),
path: Some("/data/hello.txt".to_string()),
ufs_path: Some("cos://bucket/hello.txt".to_string()),
length: Some(128 * 1024 * 1024), block_size_bytes: Some(64 * 1024 * 1024),
creation_time_ms: Some(1_700_000_000_000),
completed: Some(true),
folder: Some(false),
cacheable: Some(true),
persisted: Some(true),
block_ids: vec![1001, 1002],
last_modification_time_ms: Some(1_700_000_001_000),
owner: Some("alice".to_string()),
group: Some("staff".to_string()),
mode: Some(0o644),
persistence_state: Some("PERSISTED".to_string()),
mount_id: Some(7),
in_goose_fs_percentage: Some(100),
in_memory_percentage: Some(50),
file_block_infos: vec![
make_file_block_info(1001, 0),
make_file_block_info(1002, 64 * 1024 * 1024),
],
xattr: HashMap::from([("innerWriteType".to_string(), b"CACHE_THROUGH".to_vec())]),
..Default::default()
}
}
#[test]
fn test_from_proto_basic_fields() {
let fi = make_file_info();
let status = URIStatus::from_proto(fi);
assert_eq!(status.file_id, 42);
assert_eq!(status.name, "hello.txt");
assert_eq!(status.path, "/data/hello.txt");
assert_eq!(status.ufs_path, "cos://bucket/hello.txt");
assert_eq!(status.length, 128 * 1024 * 1024);
assert_eq!(status.block_size_bytes, 64 * 1024 * 1024);
assert_eq!(status.block_ids, vec![1001, 1002]);
assert!(status.completed);
assert!(!status.folder);
assert!(status.cacheable);
assert!(status.persisted);
assert_eq!(status.owner, "alice");
assert_eq!(status.group, "staff");
assert_eq!(status.mode, 0o644);
assert_eq!(status.in_goose_fs_percentage, 100);
assert_eq!(status.in_memory_percentage, 50);
assert_eq!(status.mount_id, 7);
}
#[test]
fn test_block_info_map_built_correctly() {
let fi = make_file_info();
let status = URIStatus::from_proto(fi);
assert!(status.has_block_infos());
assert_eq!(status.block_infos().len(), 2);
let bi1 = status.get_block_info(1001).expect("block 1001 missing");
assert_eq!(bi1.offset, Some(0));
assert_eq!(bi1.block_info.as_ref().unwrap().block_id, Some(1001));
let bi2 = status.get_block_info(1002).expect("block 1002 missing");
assert_eq!(bi2.offset, Some(64 * 1024 * 1024));
}
#[test]
fn test_get_block_info_missing_returns_none() {
let fi = make_file_info();
let status = URIStatus::from_proto(fi);
assert!(status.get_block_info(9999).is_none());
}
#[test]
fn test_add_block_info() {
let fi = make_file_info();
let mut status = URIStatus::from_proto(fi);
let new_fbi = make_file_block_info(1003, 128 * 1024 * 1024);
status.add_block_info(new_fbi);
assert!(status.get_block_info(1003).is_some());
}
#[test]
fn test_add_block_info_without_block_id_is_noop() {
let fi = make_file_info();
let mut status = URIStatus::from_proto(fi);
let initial_count = status.block_infos().len();
let bad_fbi = FileBlockInfo {
block_info: None,
offset: Some(0),
ufs_locations: vec![],
ufs_string_locations: vec![],
};
status.add_block_info(bad_fbi);
assert_eq!(status.block_infos().len(), initial_count);
}
#[test]
fn test_is_readable_completed_file() {
let fi = make_file_info(); let status = URIStatus::from_proto(fi);
assert!(status.is_readable());
}
#[test]
fn test_is_readable_folder() {
let fi = FileInfo {
completed: Some(false),
folder: Some(true),
..Default::default()
};
let status = URIStatus::from_proto(fi);
assert!(status.is_readable());
}
#[test]
fn test_is_not_readable_incomplete_file() {
let fi = FileInfo {
completed: Some(false),
folder: Some(false),
..Default::default()
};
let status = URIStatus::from_proto(fi);
assert!(!status.is_readable());
}
#[test]
fn test_block_index_for_offset() {
let fi = make_file_info(); let status = URIStatus::from_proto(fi);
assert_eq!(status.block_index_for_offset(0), Some(0));
assert_eq!(status.block_index_for_offset(64 * 1024 * 1024 - 1), Some(0));
assert_eq!(status.block_index_for_offset(64 * 1024 * 1024), Some(1));
assert_eq!(
status.block_index_for_offset(128 * 1024 * 1024 - 1),
Some(1)
);
assert_eq!(status.block_index_for_offset(128 * 1024 * 1024), None);
}
#[test]
fn test_offset_in_block() {
let fi = make_file_info();
let status = URIStatus::from_proto(fi);
assert_eq!(status.offset_in_block(0), Some(0));
assert_eq!(status.offset_in_block(100), Some(100));
assert_eq!(status.offset_in_block(64 * 1024 * 1024), Some(0));
assert_eq!(status.offset_in_block(64 * 1024 * 1024 + 100), Some(100));
}
#[test]
fn test_xattr_preserved() {
let fi = make_file_info();
let status = URIStatus::from_proto(fi);
let val = status.xattr.get("innerWriteType").expect("xattr missing");
assert_eq!(val, b"CACHE_THROUGH");
}
#[test]
fn test_empty_file_info_defaults() {
let status = URIStatus::from_proto(FileInfo::default());
assert_eq!(status.file_id, 0);
assert_eq!(status.name, "");
assert_eq!(status.path, "");
assert_eq!(status.length, 0);
assert!(!status.completed);
assert!(!status.folder);
assert!(!status.is_readable());
assert_eq!(status.block_count(), 0);
assert!(!status.has_block_infos());
}
}