use std::io::SeekFrom;
use std::sync::Arc;
use bytes::{Bytes, BytesMut};
use tracing::{debug, warn};
use crate::block::router::WorkerRouter;
use crate::client::{WorkerClient, WorkerClientPool, WorkerManagerClient};
use crate::config::GoosefsConfig;
use crate::context::FileSystemContext;
use crate::error::{Error, Result};
use crate::fs::options::InStreamOptions;
use crate::fs::uri_status::URIStatus;
use crate::io::reader::GrpcBlockReader;
use crate::proto::proto::dataserver::OpenUfsBlockOptions;
pub const TRANSFER_POSITIONED_READ_THRESHOLD: i64 = 8 * 1024;
#[allow(dead_code)]
const MAX_PREFETCH_WINDOW: i32 = 8;
pub struct GoosefsFileInStream {
status: URIStatus,
config: GoosefsConfig,
options: InStreamOptions,
pos: i64,
file_length: i64,
block_in_stream: Option<GrpcBlockReader>,
block_in_stream_block_id: i64,
#[allow(dead_code)]
cached_positioned_block_id: i64,
router: WorkerRouter,
worker_pool: Option<Arc<WorkerClientPool>>,
}
impl GoosefsFileInStream {
pub async fn open(
config: &GoosefsConfig,
path: &str,
options: crate::fs::options::OpenFileOptions,
) -> Result<Self> {
use crate::client::MasterClient;
config
.validate()
.map_err(|e| Error::ConfigError { message: e })?;
let master = MasterClient::connect(config).await?;
let file_info = master.get_status(path).await?;
let status = URIStatus::from_proto(file_info);
if status.is_folder() {
return Err(Error::OpenDirectory {
path: path.to_string(),
});
}
if !status.is_completed() {
return Err(Error::FileIncomplete {
message: format!("{path} is incomplete"),
});
}
let inquire_client = master.inquire_client().clone();
let wm = WorkerManagerClient::connect_with_inquire(config, inquire_client).await?;
let workers = wm.get_worker_info_list().await?;
if workers.is_empty() {
return Err(Error::NoWorkerAvailable {
message: "no workers available for reading".to_string(),
});
}
let router = WorkerRouter::new();
router.update_workers(workers).await;
let file_length = status.length;
debug!(
path = %path,
file_length = file_length,
block_count = status.block_ids.len(),
"GoosefsFileInStream opened"
);
Ok(Self {
file_length,
status,
config: config.clone(),
options: options.in_stream_options,
pos: 0,
block_in_stream: None,
block_in_stream_block_id: -1,
cached_positioned_block_id: -1,
router,
worker_pool: None, })
}
pub async fn open_with_context(
ctx: Arc<FileSystemContext>,
path: &str,
options: crate::fs::options::OpenFileOptions,
) -> Result<Self> {
let config = ctx.config().clone();
config
.validate()
.map_err(|e| Error::ConfigError { message: e })?;
let master = ctx.acquire_master();
let file_info = master.get_status(path).await?;
let status = URIStatus::from_proto(file_info);
if status.is_folder() {
return Err(Error::OpenDirectory {
path: path.to_string(),
});
}
if !status.is_completed() {
return Err(Error::FileIncomplete {
message: format!("{path} is incomplete"),
});
}
let shared_router = ctx.acquire_router();
let router = WorkerRouter::with_ttls(
std::time::Duration::from_secs(60),
std::time::Duration::from_secs(30),
);
let workers = shared_router.get_workers().await;
router.update_workers((*workers).clone()).await;
let file_length = status.length;
let worker_pool = ctx.acquire_worker_pool();
debug!(
path = %path,
file_length = file_length,
block_count = status.block_ids.len(),
"GoosefsFileInStream opened (context mode)"
);
Ok(Self {
file_length,
status,
config,
options: options.in_stream_options,
pos: 0,
block_in_stream: None,
block_in_stream_block_id: -1,
cached_positioned_block_id: -1,
router,
worker_pool: Some(worker_pool),
})
}
pub fn pos(&self) -> i64 {
self.pos
}
pub fn len(&self) -> i64 {
self.file_length
}
pub fn is_empty(&self) -> bool {
self.file_length == 0
}
pub fn is_eof(&self) -> bool {
self.pos >= self.file_length
}
pub fn remaining(&self) -> i64 {
(self.file_length - self.pos).max(0)
}
pub async fn seek(&mut self, pos: i64) -> Result<i64> {
let target = pos.clamp(0, self.file_length);
if target == self.pos {
return Ok(self.pos);
}
let seek_dist = (target - self.pos).abs();
let same_block = self.block_index_for_pos(target) == self.block_index_for_pos(self.pos);
if seek_dist < TRANSFER_POSITIONED_READ_THRESHOLD && same_block {
if let Some(ref mut stream) = self.block_in_stream {
if target > self.pos {
let skip = (target - self.pos) as usize;
Self::skip_bytes(stream, skip).await?;
} else {
self.block_in_stream = None;
self.block_in_stream_block_id = -1;
}
}
} else {
self.block_in_stream = None;
self.block_in_stream_block_id = -1;
}
self.pos = target;
Ok(self.pos)
}
pub async fn seek_from(&mut self, seek_from: SeekFrom) -> Result<i64> {
let target = match seek_from {
SeekFrom::Start(n) => n as i64,
SeekFrom::End(n) => self.file_length + n,
SeekFrom::Current(n) => self.pos + n,
};
self.seek(target).await
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
if self.is_eof() || buf.is_empty() {
return Ok(0);
}
let block_idx = self.block_index_for_pos(self.pos);
let block_id = self.block_id_at(block_idx)?;
if self.block_in_stream_block_id != block_id {
let offset_in_block = self.offset_in_block(self.pos);
let remaining_in_block = self.remaining_in_block(self.pos);
let worker = self.connect_worker(block_id).await?;
let worker_generation = worker.generation();
let ufs_opts = self.build_ufs_opts(block_idx);
let reader_result = GrpcBlockReader::open(
&worker,
block_id,
offset_in_block,
remaining_in_block,
self.config.chunk_size as i64,
ufs_opts.clone(),
)
.await;
let reader = match reader_result {
Ok(r) => r,
Err(e) if e.is_authentication_failed() => {
debug!(
block_id = block_id,
stale_generation = worker_generation,
error = %e,
"auth failed on block reader open, requesting single-flight reconnect"
);
let fresh = self
.reconnect_worker_for_block(block_id, Some(worker_generation))
.await?;
GrpcBlockReader::open(
&fresh,
block_id,
offset_in_block,
remaining_in_block,
self.config.chunk_size as i64,
ufs_opts,
)
.await?
}
Err(e) => return Err(e),
};
self.block_in_stream = Some(reader);
self.block_in_stream_block_id = block_id;
}
let n = self.read_from_sequential_stream(buf).await?;
if n > 0 {
self.pos += n as i64;
}
Ok(n)
}
pub async fn read_at(&mut self, offset: i64, n: usize) -> Result<Bytes> {
if offset >= self.file_length || n == 0 {
return Ok(Bytes::new());
}
let end = (offset + n as i64).min(self.file_length);
let mut result = BytesMut::with_capacity((end - offset) as usize);
let mut cur = offset;
while cur < end {
let block_idx = self.block_index_for_pos(cur);
let block_id = self.block_id_at(block_idx)?;
let offset_in_block = self.offset_in_block(cur);
let block_end = self.block_start(block_idx) + self.status.block_size_bytes;
let read_end = end.min(block_end);
let length = read_end - cur;
let worker = self.connect_worker(block_id).await?;
let worker_generation = worker.generation();
let ufs_opts = self.build_ufs_opts(block_idx);
let read_result = GrpcBlockReader::positioned_read(
&worker,
block_id,
offset_in_block,
length,
self.config.chunk_size as i64,
ufs_opts.clone(),
)
.await;
let data = match read_result {
Ok(d) => d,
Err(e) if e.is_authentication_failed() => {
debug!(
block_id = block_id,
stale_generation = worker_generation,
error = %e,
"auth failed on positioned read, requesting single-flight reconnect"
);
let fresh = self
.reconnect_worker_for_block(block_id, Some(worker_generation))
.await?;
GrpcBlockReader::positioned_read(
&fresh,
block_id,
offset_in_block,
length,
self.config.chunk_size as i64,
ufs_opts,
)
.await?
}
Err(e) => return Err(e),
};
result.extend_from_slice(&data);
cur += length;
}
Ok(result.freeze())
}
pub async fn read_all(&mut self) -> Result<Bytes> {
let remaining = self.remaining() as usize;
let mut buf = BytesMut::with_capacity(remaining);
let mut tmp = vec![0u8; (self.config.chunk_size as usize).min(65536)];
loop {
let n = self.read(&mut tmp).await?;
if n == 0 {
break;
}
buf.extend_from_slice(&tmp[..n]);
}
Ok(buf.freeze())
}
fn block_index_for_pos(&self, offset: i64) -> usize {
if self.status.block_size_bytes <= 0 {
return 0;
}
(offset / self.status.block_size_bytes) as usize
}
fn block_start(&self, idx: usize) -> i64 {
idx as i64 * self.status.block_size_bytes
}
fn offset_in_block(&self, offset: i64) -> i64 {
if self.status.block_size_bytes <= 0 {
return offset;
}
offset % self.status.block_size_bytes
}
fn remaining_in_block(&self, offset: i64) -> i64 {
let block_idx = self.block_index_for_pos(offset);
let block_end = self.block_start(block_idx) + self.status.block_size_bytes;
block_end.min(self.file_length) - offset
}
fn block_id_at(&self, block_idx: usize) -> Result<i64> {
if let Some(&id) = self.status.block_ids.get(block_idx) {
if id > 0 {
return Ok(id);
}
}
let id = self
.status
.block_infos()
.values()
.find(|fbi| {
fbi.offset
.is_some_and(|off| off == self.block_start(block_idx))
})
.and_then(|fbi| fbi.block_info.as_ref())
.and_then(|bi| bi.block_id)
.unwrap_or(-1);
if id > 0 {
Ok(id)
} else {
Err(Error::Internal {
message: format!("no valid block_id at index {block_idx}"),
source: None,
})
}
}
async fn connect_worker(&mut self, block_id: i64) -> Result<WorkerClient> {
let worker_info = self.router.select_worker(block_id).await?;
let addr = worker_info
.address
.as_ref()
.ok_or_else(|| Error::Internal {
message: "worker has no address".to_string(),
source: None,
})?;
let worker_addr = format!(
"{}:{}",
addr.host.as_deref().unwrap_or("127.0.0.1"),
addr.rpc_port.unwrap_or(9203)
);
let result = if let Some(pool) = &self.worker_pool {
pool.acquire(&worker_addr).await
} else {
WorkerClient::connect(&worker_addr, &self.config).await
};
match result {
Ok(w) => Ok(w),
Err(e) => {
if matches!(e, Error::AuthenticationFailed { .. }) {
debug!(
worker = %worker_addr,
error = %e,
"authentication failed on acquire, reconnecting with fresh credentials"
);
if let Some(pool) = &self.worker_pool {
return pool.reconnect(&worker_addr).await;
}
return WorkerClient::connect(&worker_addr, &self.config).await;
}
self.router.mark_failed(addr);
if let Some(pool) = &self.worker_pool {
pool.invalidate(&worker_addr).await;
}
warn!(worker = %worker_addr, error = %e, "worker connect failed, retrying");
let retry_info = self.router.select_worker(block_id).await?;
let retry_addr_info =
retry_info.address.as_ref().ok_or_else(|| Error::Internal {
message: "retry worker has no address".to_string(),
source: None,
})?;
let retry_addr = format!(
"{}:{}",
retry_addr_info.host.as_deref().unwrap_or("127.0.0.1"),
retry_addr_info.rpc_port.unwrap_or(9203)
);
if let Some(pool) = &self.worker_pool {
pool.acquire(&retry_addr).await
} else {
WorkerClient::connect(&retry_addr, &self.config).await
}
}
}
}
async fn reconnect_worker_for_block(
&mut self,
block_id: i64,
stale_generation: Option<u64>,
) -> Result<WorkerClient> {
let worker_info = self.router.select_worker(block_id).await?;
let addr = worker_info
.address
.as_ref()
.ok_or_else(|| Error::Internal {
message: "worker has no address".to_string(),
source: None,
})?;
let worker_addr = format!(
"{}:{}",
addr.host.as_deref().unwrap_or("127.0.0.1"),
addr.rpc_port.unwrap_or(9203)
);
if let Some(pool) = &self.worker_pool {
match stale_generation {
Some(gen) => pool.reconnect_if_stale(&worker_addr, gen).await,
None => pool.reconnect(&worker_addr).await,
}
} else {
WorkerClient::connect(&worker_addr, &self.config).await
}
}
fn build_ufs_opts(&self, block_idx: usize) -> Option<OpenUfsBlockOptions> {
let ufs_path = self.status.ufs_path.as_str();
if ufs_path.is_empty() {
return None;
}
let block_size = self.status.block_size_bytes;
let offset_in_file = block_idx as i64 * block_size;
Some(OpenUfsBlockOptions {
ufs_path: Some(ufs_path.to_string()),
offset_in_file: Some(offset_in_file),
block_size: Some(block_size),
max_ufs_read_concurrency: Some(self.options.max_ufs_read_concurrency),
mount_id: Some(self.status.mount_id),
no_cache: Some(!self.status.cacheable),
user: None,
caller_type: None,
})
}
async fn skip_bytes(stream: &mut GrpcBlockReader, mut skip: usize) -> Result<()> {
while skip > 0 {
let chunk = stream.read_chunk().await?;
match chunk {
Some(data) => {
let consumed = data.len().min(skip);
skip -= consumed;
}
None => break,
}
}
Ok(())
}
async fn read_from_sequential_stream(&mut self, buf: &mut [u8]) -> Result<usize> {
let stream = match self.block_in_stream.as_mut() {
Some(s) => s,
None => return Ok(0),
};
match stream.read_chunk().await? {
Some(data) => {
let n = data.len().min(buf.len());
buf[..n].copy_from_slice(&data[..n]);
if data.len() > buf.len() {
warn!(
chunk_len = data.len(),
buf_len = buf.len(),
"chunk larger than buffer — excess bytes discarded"
);
}
if stream.is_complete() {
self.block_in_stream = None;
self.block_in_stream_block_id = -1;
}
Ok(n)
}
None => {
self.block_in_stream = None;
self.block_in_stream_block_id = -1;
Ok(0)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::uri_status::URIStatus;
use crate::proto::grpc::file::FileInfo;
fn make_status(length: i64, block_size: i64) -> URIStatus {
let num_blocks = (length + block_size - 1) / block_size;
let block_ids: Vec<i64> = (1001..(1001 + num_blocks)).collect();
let fi = FileInfo {
length: Some(length),
block_size_bytes: Some(block_size),
block_ids: block_ids.clone(),
completed: Some(true),
folder: Some(false),
ufs_path: Some(String::new()), ..Default::default()
};
URIStatus::from_proto(fi)
}
fn make_stream(status: URIStatus) -> GoosefsFileInStream {
let config = crate::config::GoosefsConfig::new("127.0.0.1:9200");
let file_length = status.length;
GoosefsFileInStream {
file_length,
status,
config,
options: InStreamOptions::default(),
pos: 0,
block_in_stream: None,
block_in_stream_block_id: -1,
cached_positioned_block_id: -1,
router: WorkerRouter::new(),
worker_pool: None,
}
}
#[test]
fn test_block_index_calculation() {
let bs = 64 * 1024 * 1024i64;
let len = 3 * bs;
let status = make_status(len, bs);
let stream = make_stream(status);
assert_eq!(stream.block_index_for_pos(0), 0);
assert_eq!(stream.block_index_for_pos(bs - 1), 0);
assert_eq!(stream.block_index_for_pos(bs), 1);
assert_eq!(stream.block_index_for_pos(2 * bs), 2);
}
#[test]
fn test_offset_in_block() {
let bs = 64 * 1024 * 1024i64;
let status = make_status(2 * bs, bs);
let stream = make_stream(status);
assert_eq!(stream.offset_in_block(0), 0);
assert_eq!(stream.offset_in_block(100), 100);
assert_eq!(stream.offset_in_block(bs), 0);
assert_eq!(stream.offset_in_block(bs + 42), 42);
}
#[test]
fn test_remaining_in_block() {
let bs = 64 * 1024 * 1024i64;
let status = make_status(2 * bs, bs);
let stream = make_stream(status);
assert_eq!(stream.remaining_in_block(0), bs);
assert_eq!(stream.remaining_in_block(bs - 100), 100);
assert_eq!(stream.remaining_in_block(bs), bs);
}
#[test]
fn test_block_id_at() {
let bs = 64 * 1024 * 1024i64;
let status = make_status(2 * bs, bs);
let stream = make_stream(status);
assert_eq!(stream.block_id_at(0).unwrap(), 1001);
assert_eq!(stream.block_id_at(1).unwrap(), 1002);
assert!(stream.block_id_at(99).is_err()); }
#[test]
fn test_is_eof() {
let bs = 1024i64;
let status = make_status(bs, bs);
let mut stream = make_stream(status);
assert!(!stream.is_eof());
stream.pos = bs;
assert!(stream.is_eof());
}
#[test]
fn test_remaining() {
let bs = 1024i64;
let status = make_status(bs, bs);
let mut stream = make_stream(status);
assert_eq!(stream.remaining(), bs);
stream.pos = 100;
assert_eq!(stream.remaining(), bs - 100);
stream.pos = bs;
assert_eq!(stream.remaining(), 0);
}
#[test]
fn test_legacy_mode_no_pool() {
let bs = 1024i64;
let status = make_status(bs, bs);
let stream = make_stream(status);
assert!(
stream.worker_pool.is_none(),
"legacy mode should have no pool"
);
}
}