use std::sync::Arc;
use bytes::{Bytes, BytesMut};
use tracing::{debug, warn};
use crate::block::mapper::{BlockMapper, BlockReadPlan};
use crate::block::router::WorkerRouter;
use crate::client::worker::WorkerClientPool;
use crate::client::WorkerClient;
use crate::config::GoosefsConfig;
use crate::context::FileSystemContext;
use crate::error::{Error, Result};
use crate::io::reader::GrpcBlockReader;
use crate::proto::grpc::file::FileInfo;
use crate::proto::proto::dataserver::OpenUfsBlockOptions;
pub struct GoosefsFileReader {
config: GoosefsConfig,
path: String,
file_info: FileInfo,
router: WorkerRouter,
worker_pool: Option<Arc<WorkerClientPool>>,
_context: Option<Arc<FileSystemContext>>,
plans: Vec<BlockReadPlan>,
current_plan_index: usize,
total_bytes_read: u64,
offset: u64,
length: u64,
}
impl GoosefsFileReader {
pub async fn open_with_context(ctx: Arc<FileSystemContext>, path: &str) -> Result<Self> {
let (file_info, router) = Self::init_with_context(&ctx, path).await?;
let file_length = file_info.length.unwrap_or(0) as u64;
let config = ctx.config().clone();
let pool = Some(ctx.acquire_worker_pool());
Self::build(
&config,
path,
file_info,
router,
pool,
Some(ctx),
0,
file_length,
)
}
pub async fn open_range_with_context(
ctx: Arc<FileSystemContext>,
path: &str,
offset: u64,
length: u64,
) -> Result<Self> {
let (file_info, router) = Self::init_with_context(&ctx, path).await?;
let config = ctx.config().clone();
let pool = Some(ctx.acquire_worker_pool());
Self::build(
&config,
path,
file_info,
router,
pool,
Some(ctx),
offset,
length,
)
}
async fn init_with_context(
ctx: &Arc<FileSystemContext>,
path: &str,
) -> Result<(FileInfo, WorkerRouter)> {
let master = ctx.acquire_master();
let file_info = master.get_status(path).await?;
let file_length = file_info.length.unwrap_or(0);
if file_length == 0 {
debug!(path = %path, "file is empty");
}
debug!(
path = %path,
file_length = file_length,
block_count = file_info.block_ids.len(),
block_size = ?file_info.block_size_bytes,
"fetched file metadata (via context)"
);
let shared_router = ctx.acquire_router();
let workers = (*shared_router.get_workers().await).clone();
if workers.is_empty() {
return Err(Error::NoWorkerAvailable {
message: "no workers available for reading".to_string(),
});
}
debug!(
worker_count = workers.len(),
"reusing worker list from context"
);
let router = WorkerRouter::new();
router.update_workers(workers).await;
Ok((file_info, router))
}
#[allow(clippy::too_many_arguments)]
fn build(
config: &GoosefsConfig,
path: &str,
file_info: FileInfo,
router: WorkerRouter,
worker_pool: Option<Arc<WorkerClientPool>>,
context: Option<Arc<FileSystemContext>>,
offset: u64,
length: u64,
) -> Result<Self> {
let plans = BlockMapper::plan_read(&file_info, offset, length);
debug!(
path = %path,
offset = offset,
length = length,
block_segments = plans.len(),
"read plan created"
);
Ok(Self {
config: config.clone(),
path: path.to_string(),
file_info,
router,
worker_pool,
_context: context,
plans,
current_plan_index: 0,
total_bytes_read: 0,
offset,
length,
})
}
pub async fn read_next_block(&mut self) -> Result<Option<Bytes>> {
loop {
if self.current_plan_index >= self.plans.len() {
return Ok(None);
}
let plan = &self.plans[self.current_plan_index];
let block_id = self.resolve_block_id(plan);
if block_id <= 0 {
warn!(
block_index = plan.block_index,
plan_block_id = plan.block_id,
"invalid block ID, skipping block"
);
self.current_plan_index += 1;
continue; }
let worker_info = self.router.select_worker(block_id).await?;
let addr = worker_info
.address
.as_ref()
.ok_or_else(|| Error::Internal {
message: "worker has no address".to_string(),
source: None,
})?;
let worker_addr = format!(
"{}:{}",
addr.host.as_deref().unwrap_or("127.0.0.1"),
addr.rpc_port.unwrap_or(9203)
);
debug!(
block_id = block_id,
block_index = plan.block_index,
offset_in_block = plan.offset_in_block,
length = plan.length,
worker = %worker_addr,
"reading block"
);
let worker = match self.acquire_worker(&worker_addr).await {
Ok(w) => w,
Err(e) => {
if e.is_authentication_failed() {
debug!(
worker = %worker_addr,
error = %e,
"authentication failed on connect, reconnecting"
);
self.reconnect_worker(&worker_addr, None).await?
} else {
if let Some(w_addr) = worker_info.address.as_ref() {
self.router.mark_failed(w_addr);
}
warn!(
worker = %worker_addr,
error = %e,
"worker connection failed, trying another worker"
);
match self.router.select_worker(block_id).await {
Ok(retry_worker_info) => {
let retry_addr_info = retry_worker_info
.address
.as_ref()
.ok_or_else(|| Error::Internal {
message: "retry worker has no address".to_string(),
source: None,
})?;
let retry_worker_addr = format!(
"{}:{}",
retry_addr_info.host.as_deref().unwrap_or("127.0.0.1"),
retry_addr_info.rpc_port.unwrap_or(9203)
);
debug!(retry_worker = %retry_worker_addr, "retrying with different worker");
self.acquire_worker(&retry_worker_addr).await?
}
Err(_) => return Err(e),
}
}
}
};
let ufs_options = self.build_ufs_read_options(plan);
let worker_generation = worker.generation();
match self
.try_read_block(&worker, block_id, plan, ufs_options.clone())
.await
{
Ok(data) => {
let bytes_read = data.len() as u64;
self.total_bytes_read += bytes_read;
self.current_plan_index += 1;
debug!(
block_id = block_id,
bytes_read = bytes_read,
total_read = self.total_bytes_read,
"block read complete"
);
return Ok(Some(data));
}
Err(e) if e.is_authentication_failed() => {
debug!(
block_id = block_id,
worker = %worker_addr,
stale_generation = worker_generation,
error = %e,
"auth failed during block read, requesting single-flight reconnect"
);
let fresh_worker = self
.reconnect_worker(&worker_addr, Some(worker_generation))
.await?;
let mut block_reader = GrpcBlockReader::open(
&fresh_worker,
block_id,
plan.offset_in_block as i64,
plan.length as i64,
self.config.chunk_size as i64,
ufs_options,
)
.await?;
let data = block_reader.read_all().await?;
let bytes_read = data.len() as u64;
self.total_bytes_read += bytes_read;
self.current_plan_index += 1;
debug!(
block_id = block_id,
bytes_read = bytes_read,
total_read = self.total_bytes_read,
"block read complete (after auth reconnect)"
);
return Ok(Some(data));
}
Err(e) => return Err(e),
}
}
}
pub async fn read_all(&mut self) -> Result<Bytes> {
let expected_len = self.plans.iter().map(|p| p.length).sum::<u64>();
let mut buf = BytesMut::with_capacity(expected_len as usize);
while let Some(chunk) = self.read_next_block().await? {
buf.extend_from_slice(&chunk);
}
Ok(buf.freeze())
}
async fn acquire_worker(&self, addr: &str) -> Result<WorkerClient> {
if let Some(pool) = &self.worker_pool {
pool.acquire(addr).await
} else {
WorkerClient::connect(addr, &self.config).await
}
}
async fn reconnect_worker(
&self,
addr: &str,
stale_generation: Option<u64>,
) -> Result<WorkerClient> {
if let Some(pool) = &self.worker_pool {
match stale_generation {
Some(gen) => pool.reconnect_if_stale(addr, gen).await,
None => pool.reconnect(addr).await,
}
} else {
WorkerClient::connect(addr, &self.config).await
}
}
async fn try_read_block(
&self,
worker: &WorkerClient,
block_id: i64,
plan: &BlockReadPlan,
ufs_options: Option<OpenUfsBlockOptions>,
) -> Result<Bytes> {
let mut block_reader = GrpcBlockReader::open(
worker,
block_id,
plan.offset_in_block as i64,
plan.length as i64,
self.config.chunk_size as i64,
ufs_options,
)
.await?;
block_reader.read_all().await
}
fn resolve_block_id(&self, plan: &BlockReadPlan) -> i64 {
if let Some(fbi) = self
.file_info
.file_block_infos
.get(plan.block_index as usize)
{
if let Some(bi) = &fbi.block_info {
if let Some(id) = bi.block_id {
if id > 0 {
return id;
}
}
}
}
plan.block_id
}
fn build_ufs_read_options(&self, plan: &BlockReadPlan) -> Option<OpenUfsBlockOptions> {
let ufs_path = self.file_info.ufs_path.as_ref()?;
if ufs_path.is_empty() {
return None;
}
let block_size = self.file_info.block_size_bytes.unwrap_or(64 * 1024 * 1024);
let offset_in_file = plan.block_index as i64 * block_size;
Some(OpenUfsBlockOptions {
ufs_path: Some(ufs_path.clone()),
offset_in_file: Some(offset_in_file),
block_size: Some(block_size),
max_ufs_read_concurrency: None,
mount_id: self.file_info.mount_id,
no_cache: Some(!self.file_info.cacheable.unwrap_or(true)),
user: None,
caller_type: None,
})
}
pub async fn read_file_with_context(ctx: Arc<FileSystemContext>, path: &str) -> Result<Bytes> {
let mut reader = Self::open_with_context(ctx, path).await?;
reader.read_all().await
}
pub async fn read_range_with_context(
ctx: Arc<FileSystemContext>,
path: &str,
offset: u64,
length: u64,
) -> Result<Bytes> {
let mut reader = Self::open_range_with_context(ctx, path, offset, length).await?;
reader.read_all().await
}
pub fn path(&self) -> &str {
&self.path
}
pub fn file_info(&self) -> &FileInfo {
&self.file_info
}
pub fn file_length(&self) -> u64 {
self.file_info.length.unwrap_or(0) as u64
}
pub fn bytes_read(&self) -> u64 {
self.total_bytes_read
}
pub fn block_count(&self) -> usize {
self.plans.len()
}
pub fn current_block_index(&self) -> usize {
self.current_plan_index
}
pub fn is_complete(&self) -> bool {
self.current_plan_index >= self.plans.len()
}
pub fn offset(&self) -> u64 {
self.offset
}
pub fn length(&self) -> u64 {
self.length
}
}