1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8use tokio::runtime::Handle;
9
10use crate::common::config::Configuration;
11use crate::ec::{resolve_ec_policy, EcSchema};
12use crate::hdfs::block_reader::get_block_stream;
13use crate::hdfs::block_writer::BlockWriter;
14use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
15use crate::proto::hdfs;
16use crate::{HdfsError, Result};
17
18const COMPLETE_RETRY_DELAY_MS: u64 = 500;
19const COMPLETE_RETRIES: u32 = 5;
20
21pub struct FileReader {
22 protocol: Arc<NamenodeProtocol>,
23 located_blocks: hdfs::LocatedBlocksProto,
24 ec_schema: Option<EcSchema>,
25 position: usize,
26 config: Arc<Configuration>,
27 handle: Handle,
28}
29
30impl FileReader {
31 pub(crate) fn new(
32 protocol: Arc<NamenodeProtocol>,
33 located_blocks: hdfs::LocatedBlocksProto,
34 ec_schema: Option<EcSchema>,
35 config: Arc<Configuration>,
36 handle: Handle,
37 ) -> Self {
38 Self {
39 protocol,
40 located_blocks,
41 ec_schema,
42 position: 0,
43 config,
44 handle,
45 }
46 }
47
48 pub fn file_length(&self) -> usize {
50 self.located_blocks.file_length as usize
51 }
52
53 pub fn remaining(&self) -> usize {
55 if self.position > self.file_length() {
56 0
57 } else {
58 self.file_length() - self.position
59 }
60 }
61
62 pub fn seek(&mut self, pos: usize) {
64 if pos > self.file_length() {
65 panic!("Cannot seek beyond the end of a file");
66 }
67 self.position = pos;
68 }
69
70 pub fn tell(&self) -> usize {
72 self.position
73 }
74
75 pub async fn read(&mut self, len: usize) -> Result<Bytes> {
78 if self.position >= self.file_length() {
79 Ok(Bytes::new())
80 } else {
81 let offset = self.position;
82 self.position = usize::min(self.position + len, self.file_length());
83 self.read_range(offset, self.position - offset).await
84 }
85 }
86
87 pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
90 if self.position >= self.file_length() {
91 Ok(0)
92 } else {
93 let offset = self.position;
94 self.position = usize::min(self.position + buf.len(), self.file_length());
95 let read_bytes = self.position - offset;
96 self.read_range_buf(&mut buf[..read_bytes], offset).await?;
97 Ok(read_bytes)
98 }
99 }
100
101 pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
106 let mut stream = self.read_range_stream(offset, len);
107 let mut buf = BytesMut::with_capacity(len);
108 while let Some(bytes) = stream.next().await.transpose()? {
109 buf.put(bytes);
110 }
111 Ok(buf.freeze())
112 }
113
114 pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
118 let mut stream = self.read_range_stream(offset, buf.len());
119 while let Some(bytes) = stream.next().await.transpose()? {
120 buf.put(bytes);
121 }
122
123 Ok(())
124 }
125
126 pub fn read_range_stream(
130 &self,
131 offset: usize,
132 len: usize,
133 ) -> impl Stream<Item = Result<Bytes>> {
134 if offset + len > self.file_length() {
135 panic!("Cannot read past end of the file");
136 }
137
138 let block_streams: Vec<BoxStream<Result<Bytes>>> = self
139 .located_blocks
140 .blocks
141 .iter()
142 .flat_map(move |block| {
143 let block_file_start = block.offset as usize;
144 let block_file_end = block_file_start + block.b.num_bytes() as usize;
145
146 if block_file_start < (offset + len) && block_file_end > offset {
147 let block_start = offset - usize::min(offset, block_file_start);
149 let block_end = usize::min(offset + len, block_file_end) - block_file_start;
150 Some(get_block_stream(
151 Arc::clone(&self.protocol),
152 block.clone(),
153 block_start,
154 block_end - block_start,
155 self.ec_schema.clone(),
156 Arc::clone(&self.config),
157 self.handle.clone(),
158 ))
159 } else {
160 None
162 }
163 })
164 .collect();
165
166 stream::iter(block_streams).flatten()
167 }
168}
169
170pub struct FileWriter {
171 src: String,
172 protocol: Arc<NamenodeProtocol>,
173 status: hdfs::HdfsFileStatusProto,
174 config: Arc<Configuration>,
175 handle: Handle,
176 block_writer: Option<BlockWriter>,
177 last_block: Option<hdfs::LocatedBlockProto>,
178 closed: bool,
179 bytes_written: usize,
180}
181
182impl FileWriter {
183 pub(crate) fn new(
184 protocol: Arc<NamenodeProtocol>,
185 src: String,
186 status: hdfs::HdfsFileStatusProto,
187 config: Arc<Configuration>,
188 handle: Handle,
189 last_block: Option<hdfs::LocatedBlockProto>,
191 ) -> Self {
192 protocol.add_file_lease(status.file_id(), status.namespace.clone());
193 Self {
194 protocol,
195 src,
196 status,
197 config,
198 handle,
199 block_writer: None,
200 last_block,
201 closed: false,
202 bytes_written: 0,
203 }
204 }
205
206 async fn create_block_writer(&mut self) -> Result<()> {
207 let new_block = if let Some(last_block) = self.last_block.take() {
208 if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
210 {
211 last_block
213 } else {
214 self.protocol
216 .add_block(&self.src, Some(last_block.b), self.status.file_id)
217 .await?
218 .block
219 }
220 } else {
221 let extended_block = if let Some(block_writer) = self.block_writer.take() {
224 Some(block_writer.close().await?)
225 } else {
226 None
227 };
228
229 self.protocol
230 .add_block(&self.src, extended_block, self.status.file_id)
231 .await?
232 .block
233 };
234
235 let block_writer = BlockWriter::new(
236 Arc::clone(&self.protocol),
237 new_block,
238 self.protocol.get_cached_server_defaults().await?,
239 Arc::clone(&self.config),
240 self.handle.clone(),
241 self.status
242 .ec_policy
243 .as_ref()
244 .map(resolve_ec_policy)
245 .transpose()?
246 .as_ref(),
247 &self.src,
248 &self.status,
249 )
250 .await?;
251
252 self.block_writer = Some(block_writer);
253 Ok(())
254 }
255
256 async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
257 if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
259 self.create_block_writer().await?;
260 }
261
262 Ok(self.block_writer.as_mut().unwrap())
263 }
264
265 pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
266 let bytes_to_write = buf.len();
267 while !buf.is_empty() {
269 let block_writer = self.get_block_writer().await?;
270
271 block_writer.write(&mut buf).await?;
272 }
273
274 self.bytes_written += bytes_to_write;
275
276 Ok(bytes_to_write)
277 }
278
279 pub async fn close(&mut self) -> Result<()> {
280 if !self.closed {
281 let extended_block = if let Some(block_writer) = self.block_writer.take() {
282 Some(block_writer.close().await?)
283 } else {
284 None
285 };
286
287 let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
288 let mut retries = 0;
289 while retries < COMPLETE_RETRIES {
290 let successful = self
291 .protocol
292 .complete(&self.src, extended_block.clone(), self.status.file_id)
293 .await?
294 .result;
295
296 if successful {
297 self.closed = true;
298 return Ok(());
299 }
300
301 let sleep = {
303 let _guard = self.handle.enter();
304 tokio::time::sleep(Duration::from_millis(retry_delay))
305 };
306 sleep.await;
307
308 retry_delay *= 2;
309 retries += 1;
310 }
311 Err(HdfsError::OperationFailed(
312 "Failed to complete file in time".to_string(),
313 ))
314 } else {
315 Ok(())
316 }
317 }
318}
319
320impl Drop for FileWriter {
321 fn drop(&mut self) {
322 if !self.closed {
323 warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
324 }
325
326 self.protocol
327 .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
328 }
329}