use goosefs_sdk::block::WorkerRouter;
use goosefs_sdk::client::{MasterClient, WorkerClient, WorkerManagerClient};
use goosefs_sdk::config::GoosefsConfig;
use goosefs_sdk::error::Result;
use goosefs_sdk::io::GrpcBlockReader;
#[tokio::main]
async fn main() -> Result<()> {
println!("Goosefs Low-level Block Streaming Read Demo");
println!("==========================");
println!("\n1. Connecting to Goosefs Master...");
let config = GoosefsConfig::new("127.0.0.1:9200");
let master = MasterClient::connect(&config).await?;
println!("Connected!");
println!("\n2. Checking test file...");
let test_file_path = "/e2e-test/hello.txt";
let file_info = match master.get_status(test_file_path).await {
Ok(info) => info,
Err(_) => {
println!(
"Test file {} does not exist. Please run the highlevel_file_rw example first.",
test_file_path
);
println!("Run: cargo run --example highlevel_file_rw");
return Ok(());
}
};
println!("File info:");
println!(" Path: {:?}", file_info.path);
println!(" Length: {:?} bytes", file_info.length);
println!(" Block size: {:?}", file_info.block_size_bytes);
println!(" File ID: {:?}", file_info.file_id);
println!(" Block IDs: {:?}", file_info.block_ids);
println!(" File block info: {:?}", file_info.file_block_infos);
println!("\n3. Discovering workers and building router...");
let wm = WorkerManagerClient::connect(&config).await?;
let workers = wm.get_worker_info_list().await?;
println!("Discovered {} workers", workers.len());
let router = WorkerRouter::new();
router.update_workers(workers).await;
println!("\n4. Mapping file range to block-level read plan...");
let file_length = file_info.length.unwrap_or(0) as u64;
let block_id = if let Some(first_block) = file_info.file_block_infos.first() {
if let Some(block_info) = &first_block.block_info {
block_info.block_id.unwrap_or(0)
} else {
0
}
} else {
0
};
println!("Using block ID: {}", block_id);
println!("File length: {} bytes", file_length);
println!("\n5. Streaming block read...");
let mut total_bytes_read = 0;
if block_id > 0 {
println!("\nReading block (ID: {})...", block_id);
let worker_info = match router.select_worker(block_id).await {
Ok(info) => info,
Err(e) => {
println!("Failed to select worker: {:?}", e);
return Ok(());
}
};
let addr = worker_info.address.as_ref().unwrap();
let worker_addr = format!(
"{}:{}",
addr.host.as_deref().unwrap_or("127.0.0.1"),
addr.rpc_port.unwrap_or(9203)
);
println!(" Connecting to worker: {}", worker_addr);
let worker = match WorkerClient::connect(&worker_addr, &config).await {
Ok(client) => client,
Err(e) => {
println!("Failed to connect to worker: {:?}", e);
return Ok(());
}
};
let mut reader = match GrpcBlockReader::open(
&worker,
block_id,
0, file_length as i64,
config.chunk_size as i64,
None, )
.await
{
Ok(reader) => reader,
Err(e) => {
println!("Failed to create block reader: {:?}", e);
return Ok(());
}
};
let data = match reader.read_all().await {
Ok(data) => data,
Err(e) => {
println!("Failed to read block data: {:?}", e);
return Ok(());
}
};
total_bytes_read += data.len();
println!(
" Read {} bytes (complete: {})",
data.len(),
reader.is_complete()
);
if let Ok(content) = String::from_utf8(data.to_vec()) {
println!(
" Content preview: {:?}",
if content.len() > 50 {
format!("{}...", &content[..50])
} else {
content
}
);
}
} else {
println!("Invalid block ID, cannot read");
}
println!("\n6. Read complete");
println!("Total bytes read: {} bytes", total_bytes_read);
println!("Total file length: {} bytes", file_length);
if total_bytes_read == file_length as usize {
println!("✅ Successfully read the entire file!");
} else {
println!("⚠️ Bytes read does not match file length");
}
Ok(())
}