use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use bytes::Bytes;
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::block::router::WorkerRouter;
use crate::client::master::default_file_mode;
use crate::client::worker::{WorkerClientPool, WriteBlockOptions};
use crate::client::MasterClient;
use crate::config::GoosefsConfig;
use crate::context::FileSystemContext;
use crate::error::{Error, Result};
use crate::fs::options::DeleteOptions;
use crate::io::writer::GrpcBlockWriter;
use crate::proto::grpc::block::RequestType;
use crate::proto::grpc::file::{CreateFilePOptions, FileInfo, FsOpPId};
use crate::proto::proto::dataserver::CreateUfsFileOptions;
#[derive(Clone, Debug)]
struct WriteStrategy {
cache_stream: bool,
ufs_stream: bool,
create_ufs_file_options: Option<CreateUfsFileOptions>,
need_async_persist: bool,
}
fn resolve_write_strategy(write_type: Option<i32>, file_info: &FileInfo) -> WriteStrategy {
let build_ufs_opts = || CreateUfsFileOptions {
ufs_path: file_info.ufs_path.clone(),
owner: file_info.owner.clone(),
group: file_info.group.clone(),
mode: file_info.mode,
mount_id: file_info.mount_id,
acl: None,
};
match write_type {
Some(3) => WriteStrategy {
cache_stream: true,
ufs_stream: true,
create_ufs_file_options: Some(build_ufs_opts()),
need_async_persist: false,
},
Some(4) => WriteStrategy {
cache_stream: false,
ufs_stream: true,
create_ufs_file_options: Some(build_ufs_opts()),
need_async_persist: false,
},
Some(5) => WriteStrategy {
cache_stream: true,
ufs_stream: false,
create_ufs_file_options: None,
need_async_persist: true,
},
_ => WriteStrategy {
cache_stream: true,
ufs_stream: false,
create_ufs_file_options: None,
need_async_persist: false,
},
}
}
fn uuid_to_fs_op_pid(id: Uuid) -> FsOpPId {
let (high, low) = id.as_u64_pair();
FsOpPId {
most_significant_bits: Some(high as i64),
least_significant_bits: Some(low as i64),
}
}
pub struct GoosefsFileWriter {
config: GoosefsConfig,
path: String,
master: MasterClient,
router: WorkerRouter,
worker_pool: Arc<WorkerClientPool>,
_context: Option<Arc<FileSystemContext>>,
file_info: FileInfo,
total_bytes_written: u64,
operation_id: Uuid,
cancelled: AtomicBool,
closed: AtomicBool,
write_strategy: WriteStrategy,
committed_block_ids: Vec<i64>,
current_block_writer: Option<ActiveBlockWriter>,
ufs_stream: Option<GrpcBlockWriter>,
ufs_worker_addr: Option<String>,
ufs_stream_completed: AtomicBool,
}
impl GoosefsFileWriter {
pub async fn create_with_context(
ctx: Arc<FileSystemContext>,
path: &str,
options: Option<CreateFilePOptions>,
) -> Result<Self> {
let config = ctx.config().clone();
let master_arc = ctx.acquire_master();
let create_options = options.unwrap_or_else(|| {
let mut opts = CreateFilePOptions {
block_size_bytes: Some(config.block_size as i64),
mode: Some(default_file_mode()),
recursive: Some(true),
..Default::default()
};
if config.write_type.is_some() {
opts.write_type = config.write_type;
}
opts
});
let mut create_options = create_options;
if create_options.recursive.is_none() {
create_options.recursive = Some(true);
}
if create_options.block_size_bytes.is_none() || create_options.block_size_bytes == Some(0) {
create_options.block_size_bytes = Some(config.block_size as i64);
}
if create_options.mode.is_none() {
create_options.mode = Some(default_file_mode());
}
let file_info = master_arc.create_file(path, create_options).await?;
debug!(
path = %path,
file_id = ?file_info.file_id,
"file created on Master (via context)"
);
let effective_write_type = create_options.write_type.or(config.write_type);
let write_strategy = resolve_write_strategy(effective_write_type, &file_info);
let router_arc = ctx.acquire_router();
let worker_pool = ctx.acquire_worker_pool();
let router = WorkerRouter::new();
let workers = (*router_arc.get_workers().await).clone();
if workers.is_empty() {
return Err(Error::NoWorkerAvailable {
message: "no workers available for writing".to_string(),
});
}
router.update_workers(workers).await;
let operation_id = Uuid::new_v4();
let master = (*master_arc).clone();
Ok(Self {
config,
path: path.to_string(),
master,
router,
worker_pool,
_context: Some(ctx), file_info,
total_bytes_written: 0,
operation_id,
cancelled: AtomicBool::new(false),
closed: AtomicBool::new(false),
write_strategy,
committed_block_ids: Vec::new(),
current_block_writer: None,
ufs_stream: None,
ufs_worker_addr: None,
ufs_stream_completed: AtomicBool::new(false),
})
}
pub async fn write(&mut self, data: &[u8]) -> Result<()> {
if self.cancelled.load(Ordering::SeqCst) || self.closed.load(Ordering::SeqCst) {
return Err(Error::BlockIoError {
message: "cannot write to a completed or cancelled file".to_string(),
});
}
if data.is_empty() {
return Ok(());
}
if self.write_strategy.cache_stream {
self.write_to_cache_stream(data).await?;
}
if self.write_strategy.ufs_stream {
self.write_to_ufs_stream(data).await?;
}
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
if self.cancelled.load(Ordering::SeqCst) || self.closed.load(Ordering::SeqCst) {
return Err(Error::BlockIoError {
message: "cannot flush a completed or cancelled file".to_string(),
});
}
if let Some(active) = self.current_block_writer.as_mut() {
if active.bytes_written > 0 {
active.writer.flush().await?;
}
}
Ok(())
}
async fn write_to_cache_stream(&mut self, data: &[u8]) -> Result<()> {
let block_size = self
.file_info
.block_size_bytes
.unwrap_or(self.config.block_size as i64) as u64;
let chunk_size = self.config.chunk_size as usize;
let mut offset = 0usize;
while offset < data.len() {
if self.current_block_writer.is_none()
|| self.current_block_writer.as_ref().unwrap().remaining() == 0
{
self.open_next_block(block_size).await?;
}
let writer = self.current_block_writer.as_mut().unwrap();
let remaining_in_block = writer.remaining() as usize;
let remaining_data = data.len() - offset;
let to_write = std::cmp::min(remaining_in_block, remaining_data);
let end = offset + to_write;
let mut chunk_offset = offset;
while chunk_offset < end {
let chunk_end = std::cmp::min(chunk_offset + chunk_size, end);
let chunk = Bytes::copy_from_slice(&data[chunk_offset..chunk_end]);
let chunk_len = chunk.len() as u64;
match writer.writer.write_chunk(chunk).await {
Ok(()) => {
writer.bytes_written += chunk_len;
}
Err(e) => {
return self.handle_cache_write_exception(e).await;
}
}
chunk_offset = chunk_end;
}
offset = end;
if writer.remaining() == 0 {
self.close_current_block().await?;
}
}
Ok(())
}
async fn write_to_ufs_stream(&mut self, data: &[u8]) -> Result<()> {
if self.ufs_stream.is_none() {
self.open_ufs_stream().await?;
}
let chunk_size = self.config.chunk_size as usize;
let ufs = self
.ufs_stream
.as_mut()
.expect("ufs_stream just opened above");
let total = data.len();
match ufs.write_all(data, chunk_size).await {
Ok(()) => {
self.total_bytes_written += total as u64;
Ok(())
}
Err(e) => self.handle_ufs_write_exception(e).await,
}
}
async fn open_next_block(&mut self, block_size: u64) -> Result<()> {
if self.current_block_writer.is_some() {
self.close_current_block().await?;
}
let file_id = self.file_info.file_id.unwrap_or(0);
let block_index = self.committed_block_ids.len() as u64;
let block_id = compute_block_id(file_id, block_index);
let worker_info = self.router.select_worker(block_id).await?;
let addr = worker_info
.address
.as_ref()
.ok_or_else(|| Error::Internal {
message: "worker has no address".to_string(),
source: None,
})?;
let worker_addr = format!(
"{}:{}",
addr.host.as_deref().unwrap_or("127.0.0.1"),
addr.rpc_port.unwrap_or(9203)
);
debug!(
block_id = block_id,
block_index = block_index,
worker = %worker_addr,
"opening new cache block writer"
);
let worker = match self.worker_pool.acquire(&worker_addr).await {
Ok(w) => w,
Err(e) => {
self.router.mark_failed(addr);
self.worker_pool.invalidate(&worker_addr).await;
return Err(e);
}
};
let write_opts = WriteBlockOptions {
request_type: RequestType::GoosefsBlock,
create_ufs_file_options: None,
};
let block_writer =
match GrpcBlockWriter::open(&worker, block_id, block_size as i64, write_opts).await {
Ok(w) => w,
Err(e) => {
self.router.mark_failed(addr);
self.worker_pool.invalidate(&worker_addr).await;
return Err(e);
}
};
self.current_block_writer = Some(ActiveBlockWriter {
writer: block_writer,
block_id,
block_size,
bytes_written: 0,
worker_addr,
});
Ok(())
}
async fn close_current_block(&mut self) -> Result<()> {
if let Some(active) = self.current_block_writer.take() {
let block_id = active.block_id;
let bytes_written = active.bytes_written;
let mut writer = active.writer;
if bytes_written > 0 {
let ack_offset = writer.flush().await?;
debug!(
block_id = block_id,
ack_offset = ack_offset,
bytes_written = bytes_written,
"cache block flushed"
);
writer.close().await?;
self.committed_block_ids.push(block_id);
if !self.write_strategy.ufs_stream {
self.total_bytes_written += bytes_written;
}
} else {
writer.cancel().await;
}
}
Ok(())
}
async fn open_ufs_stream(&mut self) -> Result<()> {
const UFS_BLOCK_ID: i64 = -1; const UFS_STREAM_LENGTH: i64 = i64::MAX;
let worker_info = self.router.pick_any_worker().await?;
let addr = worker_info
.address
.as_ref()
.ok_or_else(|| Error::Internal {
message: "ufs-stream worker has no address".to_string(),
source: None,
})?;
let worker_addr = format!(
"{}:{}",
addr.host.as_deref().unwrap_or("127.0.0.1"),
addr.rpc_port.unwrap_or(9203)
);
debug!(
worker = %worker_addr,
path = %self.path,
"opening UFS stream for CACHE_THROUGH/THROUGH"
);
let worker = match self.worker_pool.acquire(&worker_addr).await {
Ok(w) => w,
Err(e) => {
self.router.mark_failed(addr);
self.worker_pool.invalidate(&worker_addr).await;
return Err(e);
}
};
let write_opts = WriteBlockOptions {
request_type: RequestType::UfsFile,
create_ufs_file_options: self.write_strategy.create_ufs_file_options.clone(),
};
let writer =
match GrpcBlockWriter::open(&worker, UFS_BLOCK_ID, UFS_STREAM_LENGTH, write_opts).await
{
Ok(w) => w,
Err(e) => {
self.router.mark_failed(addr);
self.worker_pool.invalidate(&worker_addr).await;
return Err(e);
}
};
self.ufs_stream = Some(writer);
self.ufs_worker_addr = Some(worker_addr);
Ok(())
}
async fn handle_cache_write_exception(&mut self, err: Error) -> Result<()> {
warn!(
path = %self.path,
error = %err,
"failed to write to Goosefs cache, cancelling block"
);
if let Some(active) = self.current_block_writer.take() {
self.router
.mark_failed(&crate::proto::grpc::WorkerNetAddress {
host: Some(
active
.worker_addr
.split(':')
.next()
.unwrap_or("unknown")
.to_string(),
),
rpc_port: active
.worker_addr
.split(':')
.nth(1)
.and_then(|p| p.parse().ok()),
..Default::default()
});
self.worker_pool.invalidate(&active.worker_addr).await;
active.writer.cancel().await;
}
Err(err)
}
async fn handle_ufs_write_exception(&mut self, err: Error) -> Result<()> {
warn!(
path = %self.path,
error = %err,
"failed to write to UFS stream"
);
if let Some(writer) = self.ufs_stream.take() {
writer.cancel().await;
}
if let Some(worker_addr) = self.ufs_worker_addr.take() {
let host = worker_addr
.split(':')
.next()
.unwrap_or("unknown")
.to_string();
let port = worker_addr.split(':').nth(1).and_then(|p| p.parse().ok());
self.router
.mark_failed(&crate::proto::grpc::WorkerNetAddress {
host: Some(host),
rpc_port: port,
..Default::default()
});
self.worker_pool.invalidate(&worker_addr).await;
}
Err(err)
}
async fn do_cancel_cleanup(&mut self) {
if let Some(writer) = self.ufs_stream.take() {
writer.cancel().await;
}
self.ufs_worker_addr = None;
if let Some(active) = self.current_block_writer.take() {
active.writer.cancel().await;
}
if !self.committed_block_ids.is_empty() {
let block_ids = self.committed_block_ids.clone();
debug!(
path = %self.path,
block_count = block_ids.len(),
"cancel: calling remove_blocks on Master"
);
if let Err(e) = self.master.remove_blocks(block_ids).await {
warn!(
path = %self.path,
error = %e,
"remove_blocks failed, falling back to delete(unchecked=true)"
);
if let Err(del_err) = self
.master
.delete_with_options(&self.path, DeleteOptions::for_cancel())
.await
{
warn!(
path = %self.path,
error = %del_err,
"fallback delete also failed — blocks may need manual cleanup"
);
}
}
}
}
pub async fn cancel(&mut self) -> Result<()> {
if self.closed.load(Ordering::SeqCst) {
return Ok(());
}
if self.cancelled.swap(true, Ordering::SeqCst) {
return Ok(());
}
self.do_cancel_cleanup().await;
info!(
path = %self.path,
committed_blocks = self.committed_block_ids.len(),
"file write cancelled"
);
Ok(())
}
async fn handle_complete_file_error(&mut self, err: Error) -> Error {
if self.ufs_stream_completed.load(Ordering::SeqCst) {
warn!(
path = %self.path,
error = %err,
"completeFile failed after UFS close succeeded; \
removing Goosefs-only metadata entry (goosefs_only=true, unchecked=true)"
);
if let Err(del_err) = self
.master
.delete_with_options(&self.path, DeleteOptions::goosefs_only_unchecked())
.await
{
warn!(
path = %self.path,
error = %del_err,
"failed to clean up Goosefs metadata after completeFile failure — \
manual cleanup may be required"
);
}
}
err
}
pub async fn close(&mut self) -> Result<()> {
if self
.closed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
warn!(path = %self.path, "close() called on already-completed file");
return Ok(());
}
if self.cancelled.load(Ordering::SeqCst) {
return Ok(());
}
if let Some(mut ufs) = self.ufs_stream.take() {
if let Err(e) = ufs.flush().await {
warn!(
path = %self.path,
error = %e,
"failed to flush UFS stream during close, cancelling"
);
ufs.cancel().await;
self.do_cancel_cleanup().await;
return Err(e);
}
if let Err(e) = ufs.close().await {
warn!(
path = %self.path,
error = %e,
"failed to close UFS stream during close, cancelling"
);
self.do_cancel_cleanup().await;
return Err(e);
}
self.ufs_stream_completed.store(true, Ordering::SeqCst);
self.ufs_worker_addr = None;
}
if let Err(e) = self.close_current_block().await {
warn!(
path = %self.path,
error = %e,
"failed to close current block during file close, cancelling"
);
self.do_cancel_cleanup().await;
return Err(e);
}
let ufs_length = if self.write_strategy.ufs_stream || self.total_bytes_written > 0 {
Some(self.total_bytes_written as i64)
} else {
None
};
let op_id = uuid_to_fs_op_pid(self.operation_id);
if let Err(e) = self
.master
.complete_file(&self.path, ufs_length, Some(op_id))
.await
{
let e = self.handle_complete_file_error(e).await;
return Err(e);
}
if self.write_strategy.need_async_persist {
debug!(path = %self.path, "scheduling async persistence for ASYNC_THROUGH");
if let Err(e) = self
.master
.schedule_async_persistence(&self.path, None)
.await
{
warn!(
path = %self.path,
error = %e,
"failed to schedule async persistence — file is complete but may not persist to UFS"
);
}
}
info!(
path = %self.path,
total_bytes = self.total_bytes_written,
cache_blocks = self.committed_block_ids.len(),
ufs_stream = self.write_strategy.ufs_stream,
"file write completed"
);
Ok(())
}
pub async fn write_file_with_context(
ctx: Arc<FileSystemContext>,
path: &str,
data: &[u8],
) -> Result<u64> {
Self::write_file_with_context_and_options(ctx, path, data, None).await
}
pub async fn write_file_with_context_and_options(
ctx: Arc<FileSystemContext>,
path: &str,
data: &[u8],
options: Option<CreateFilePOptions>,
) -> Result<u64> {
let mut writer = Self::create_with_context(ctx, path, options).await?;
writer.write(data).await?;
writer.close().await?;
Ok(writer.total_bytes_written)
}
pub fn bytes_written(&self) -> u64 {
self.total_bytes_written
}
pub fn path(&self) -> &str {
&self.path
}
pub fn is_completed(&self) -> bool {
self.closed.load(Ordering::SeqCst) && !self.cancelled.load(Ordering::SeqCst)
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
pub fn file_info(&self) -> &FileInfo {
&self.file_info
}
}
fn compute_block_id(file_id: i64, block_index: u64) -> i64 {
const CONTAINER_ID_BITS: u32 = 40;
const SEQUENCE_NUMBER_BITS: u32 = 64 - CONTAINER_ID_BITS; const CONTAINER_ID_MASK: i64 = (1i64 << CONTAINER_ID_BITS) - 1;
const SEQUENCE_NUMBER_MASK: u64 = (1u64 << SEQUENCE_NUMBER_BITS) - 1;
let container_id = (file_id >> SEQUENCE_NUMBER_BITS) & CONTAINER_ID_MASK;
let seq = (block_index & SEQUENCE_NUMBER_MASK) as i64;
(container_id << SEQUENCE_NUMBER_BITS) | seq
}
struct ActiveBlockWriter {
writer: GrpcBlockWriter,
block_id: i64,
block_size: u64,
bytes_written: u64,
worker_addr: String,
}
impl ActiveBlockWriter {
fn remaining(&self) -> u64 {
self.block_size - self.bytes_written
}
}
impl Drop for GoosefsFileWriter {
fn drop(&mut self) {
let is_closed = self.closed.load(Ordering::SeqCst);
let is_cancelled = self.cancelled.load(Ordering::SeqCst);
if !is_closed && !is_cancelled && self.total_bytes_written > 0 {
warn!(
path = %self.path,
bytes_written = self.total_bytes_written,
committed_blocks = self.committed_block_ids.len(),
"GoosefsFileWriter dropped without calling close() or cancel() — file may be incomplete"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compute_block_id() {
let inode_id = 33554431i64; assert_eq!(compute_block_id(inode_id, 0), 1 << 24);
assert_eq!(compute_block_id(inode_id, 1), (1 << 24) | 1);
let inode_id_2 = 2i64 << 24;
assert_eq!(compute_block_id(inode_id_2, 0), 2 << 24);
}
#[test]
fn test_compute_block_id_container_extraction() {
const SEQUENCE_NUMBER_BITS: u32 = 24;
const CONTAINER_ID_MASK: i64 = (1i64 << 40) - 1;
let file_id = 33554431i64;
let block_id = compute_block_id(file_id, 3);
let container_id = (block_id >> SEQUENCE_NUMBER_BITS) & CONTAINER_ID_MASK;
assert_eq!(container_id, 1);
assert_eq!(block_id & ((1 << SEQUENCE_NUMBER_BITS) - 1), 3);
}
fn make_test_file_info() -> FileInfo {
FileInfo {
file_id: Some(1),
ufs_path: Some("/ufs/data/test.txt".to_string()),
owner: Some("hadoop".to_string()),
group: Some("supergroup".to_string()),
mode: Some(0o644),
mount_id: Some(42),
..Default::default()
}
}
#[test]
fn test_strategy_must_cache() {
let fi = make_test_file_info();
let s = resolve_write_strategy(Some(1), &fi); assert!(s.cache_stream);
assert!(!s.ufs_stream);
assert!(s.create_ufs_file_options.is_none());
assert!(!s.need_async_persist);
}
#[test]
fn test_strategy_cache_through() {
let fi = make_test_file_info();
let s = resolve_write_strategy(Some(3), &fi); assert!(s.cache_stream, "CACHE_THROUGH must enable cache stream");
assert!(s.ufs_stream, "CACHE_THROUGH must enable UFS stream");
assert!(s.create_ufs_file_options.is_some());
assert!(!s.need_async_persist);
}
#[test]
fn test_strategy_through() {
let fi = make_test_file_info();
let s = resolve_write_strategy(Some(4), &fi); assert!(!s.cache_stream, "THROUGH must NOT enable cache stream");
assert!(s.ufs_stream);
let ufs_opts = s.create_ufs_file_options.as_ref().unwrap();
assert_eq!(ufs_opts.ufs_path, Some("/ufs/data/test.txt".to_string()));
assert_eq!(ufs_opts.owner, Some("hadoop".to_string()));
assert_eq!(ufs_opts.group, Some("supergroup".to_string()));
assert_eq!(ufs_opts.mode, Some(0o644));
assert_eq!(ufs_opts.mount_id, Some(42));
assert!(!s.need_async_persist);
}
#[test]
fn test_strategy_async_through() {
let fi = make_test_file_info();
let s = resolve_write_strategy(Some(5), &fi); assert!(s.cache_stream);
assert!(!s.ufs_stream);
assert!(s.create_ufs_file_options.is_none());
assert!(s.need_async_persist);
}
#[test]
fn test_strategy_default_unset() {
let fi = make_test_file_info();
let s = resolve_write_strategy(None, &fi);
assert!(s.cache_stream);
assert!(!s.ufs_stream);
assert!(s.create_ufs_file_options.is_none());
assert!(!s.need_async_persist);
}
#[test]
fn test_strategy_try_cache() {
let fi = make_test_file_info();
let s = resolve_write_strategy(Some(2), &fi); assert!(s.cache_stream);
assert!(!s.ufs_stream);
assert!(s.create_ufs_file_options.is_none());
assert!(!s.need_async_persist);
}
#[test]
fn test_context_field_is_option_arc() {
let ctx_field: Option<Arc<FileSystemContext>> = None;
assert!(ctx_field.is_none());
}
#[test]
fn test_uuid_to_fs_op_pid_bit_layout() {
let high_bytes: [u8; 8] = [0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77];
let low_bytes: [u8; 8] = [0x88u8, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff];
let mut bytes = [0u8; 16];
bytes[..8].copy_from_slice(&high_bytes);
bytes[8..].copy_from_slice(&low_bytes);
let uuid = Uuid::from_bytes(bytes);
let op_id = uuid_to_fs_op_pid(uuid);
let expected_high = i64::from_be_bytes(high_bytes);
let expected_low = i64::from_be_bytes(low_bytes);
assert_eq!(op_id.most_significant_bits, Some(expected_high));
assert_eq!(op_id.least_significant_bits, Some(expected_low));
}
}