1use std::io::{self, SeekFrom};
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::{BufMut, Bytes, BytesMut};
8use futures::stream::BoxStream;
9use futures::{Stream, StreamExt, stream};
10use log::warn;
11use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
12use tokio::runtime::Handle;
13
14use crate::common::config::Configuration;
15use crate::ec::{EcSchema, resolve_ec_policy};
16use crate::hdfs::block_reader::get_block_stream;
17use crate::hdfs::block_writer::BlockWriter;
18use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
19use crate::proto::hdfs;
20use crate::{HdfsError, Result};
21
22const COMPLETE_RETRY_DELAY_MS: u64 = 500;
23const COMPLETE_RETRIES: u32 = 5;
24
25fn io_error(error: HdfsError) -> io::Error {
26 io::Error::other(error)
27}
28
29struct PendingRead {
30 stream: BoxStream<'static, Result<Bytes>>,
31 end_position: usize,
32}
33
34pub struct FileReader {
35 protocol: Arc<NamenodeProtocol>,
36 located_blocks: hdfs::LocatedBlocksProto,
37 ec_schema: Option<EcSchema>,
38 position: usize,
39 config: Arc<Configuration>,
40 handle: Handle,
41 pending_read: Option<std::sync::Mutex<PendingRead>>,
42}
43
44impl FileReader {
45 pub(crate) fn new(
46 protocol: Arc<NamenodeProtocol>,
47 located_blocks: hdfs::LocatedBlocksProto,
48 ec_schema: Option<EcSchema>,
49 config: Arc<Configuration>,
50 handle: Handle,
51 ) -> Self {
52 Self {
53 protocol,
54 located_blocks,
55 ec_schema,
56 position: 0,
57 config,
58 handle,
59 pending_read: None,
60 }
61 }
62
63 pub fn file_length(&self) -> usize {
65 self.located_blocks.file_length as usize
66 }
67
68 pub fn remaining(&self) -> usize {
70 if self.position > self.file_length() {
71 0
72 } else {
73 self.file_length() - self.position
74 }
75 }
76
77 pub fn set_position(&mut self, pos: usize) {
79 if pos > self.file_length() {
80 panic!("Cannot seek beyond the end of a file");
81 }
82 self.pending_read = None;
83 self.position = pos;
84 }
85
86 pub fn tell(&self) -> usize {
88 self.position
89 }
90
91 pub async fn read_bytes(&mut self, len: usize) -> Result<Bytes> {
94 self.pending_read = None;
95 if self.position >= self.file_length() {
96 Ok(Bytes::new())
97 } else {
98 let offset = self.position;
99 self.position = usize::min(self.position + len, self.file_length());
100 self.read_range(offset, self.position - offset).await
101 }
102 }
103
104 pub async fn read_into(&mut self, buf: &mut [u8]) -> Result<usize> {
107 self.pending_read = None;
108 if self.position >= self.file_length() {
109 Ok(0)
110 } else {
111 let offset = self.position;
112 self.position = usize::min(self.position + buf.len(), self.file_length());
113 let read_bytes = self.position - offset;
114 self.read_range_buf(&mut buf[..read_bytes], offset).await?;
115 Ok(read_bytes)
116 }
117 }
118
119 pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
124 let mut stream = self.read_range_stream(offset, len);
125 let mut buf = BytesMut::with_capacity(len);
126 while let Some(bytes) = stream.next().await.transpose()? {
127 buf.put(bytes);
128 }
129 Ok(buf.freeze())
130 }
131
132 pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
136 let mut stream = self.read_range_stream(offset, buf.len());
137 while let Some(bytes) = stream.next().await.transpose()? {
138 buf.put(bytes);
139 }
140
141 Ok(())
142 }
143
144 pub fn read_range_stream(
148 &self,
149 offset: usize,
150 len: usize,
151 ) -> impl Stream<Item = Result<Bytes>> + use<> {
152 if offset + len > self.file_length() {
153 panic!("Cannot read past end of the file");
154 }
155
156 let block_streams: Vec<BoxStream<Result<Bytes>>> = self
157 .located_blocks
158 .blocks
159 .iter()
160 .flat_map(move |block| {
161 let block_file_start = block.offset as usize;
162 let block_file_end = block_file_start + block.b.num_bytes() as usize;
163
164 if block_file_start < (offset + len) && block_file_end > offset {
165 let block_start = offset - usize::min(offset, block_file_start);
167 let block_end = usize::min(offset + len, block_file_end) - block_file_start;
168 Some(get_block_stream(
169 Arc::clone(&self.protocol),
170 block.clone(),
171 block_start,
172 block_end - block_start,
173 self.ec_schema.clone(),
174 Arc::clone(&self.config),
175 self.handle.clone(),
176 ))
177 } else {
178 None
180 }
181 })
182 .collect();
183
184 stream::iter(block_streams).flatten()
185 }
186}
187
188impl AsyncRead for FileReader {
189 fn poll_read(
190 mut self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 buf: &mut ReadBuf<'_>,
193 ) -> Poll<io::Result<()>> {
194 if buf.remaining() == 0 {
195 return Poll::Ready(Ok(()));
196 }
197
198 let starting_len = buf.filled().len();
199
200 loop {
201 if self.pending_read.is_none() {
202 if self.position >= self.file_length() {
203 return Poll::Ready(Ok(()));
204 }
205
206 let offset = self.position;
207 let len = usize::min(buf.remaining(), self.file_length() - self.position);
208 let stream = self.read_range_stream(offset, len).boxed();
209 self.pending_read = Some(std::sync::Mutex::new(PendingRead {
210 stream,
211 end_position: offset + len,
212 }));
213 }
214
215 let poll_result = {
216 let mut pending = self.pending_read.as_ref().unwrap().lock().unwrap();
217 Pin::new(&mut pending.stream).poll_next(cx)
218 };
219
220 match poll_result {
221 Poll::Ready(Some(Ok(bytes))) => {
222 self.position += bytes.len();
223 buf.put_slice(&bytes);
224
225 if self.pending_read.as_ref().is_some_and(|pending| {
226 self.position >= pending.lock().unwrap().end_position
227 }) {
228 self.pending_read = None;
229 }
230
231 if buf.remaining() == 0 {
232 return Poll::Ready(Ok(()));
233 }
234 }
235 Poll::Ready(Some(Err(error))) => {
236 self.pending_read = None;
237 return Poll::Ready(Err(io_error(error)));
238 }
239 Poll::Ready(None) => {
240 self.pending_read = None;
241 return Poll::Ready(Ok(()));
242 }
243 Poll::Pending => {
244 if buf.filled().len() > starting_len {
245 return Poll::Ready(Ok(()));
246 }
247 return Poll::Pending;
248 }
249 }
250 }
251 }
252}
253
254impl AsyncSeek for FileReader {
255 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
256 let file_length = self.file_length() as i128;
257 let current = self.tell() as i128;
258 let new_pos = match position {
259 SeekFrom::Start(pos) => i128::from(pos),
260 SeekFrom::End(offset) => file_length + i128::from(offset),
261 SeekFrom::Current(offset) => current + i128::from(offset),
262 };
263
264 if new_pos < 0 || new_pos > file_length {
265 return Err(io::Error::new(
266 io::ErrorKind::InvalidInput,
267 "cannot seek outside of file bounds",
268 ));
269 }
270
271 self.set_position(new_pos as usize);
272 Ok(())
273 }
274
275 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
276 Poll::Ready(Ok(self.tell() as u64))
277 }
278}
279
280pub struct FileWriter {
281 src: String,
282 protocol: Arc<NamenodeProtocol>,
283 status: hdfs::HdfsFileStatusProto,
284 config: Arc<Configuration>,
285 handle: Handle,
286 block_writer: Option<BlockWriter>,
287 last_block: Option<hdfs::LocatedBlockProto>,
288 closed: bool,
289 bytes_written: usize,
290}
291
292impl FileWriter {
293 pub(crate) fn new(
294 protocol: Arc<NamenodeProtocol>,
295 src: String,
296 status: hdfs::HdfsFileStatusProto,
297 config: Arc<Configuration>,
298 handle: Handle,
299 last_block: Option<hdfs::LocatedBlockProto>,
301 ) -> Self {
302 protocol.add_file_lease(status.file_id(), status.namespace.clone());
303 Self {
304 protocol,
305 src,
306 status,
307 config,
308 handle,
309 block_writer: None,
310 last_block,
311 closed: false,
312 bytes_written: 0,
313 }
314 }
315
316 async fn create_block_writer(&mut self) -> Result<()> {
317 let new_block = if let Some(last_block) = self.last_block.take() {
318 if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
320 {
321 last_block
323 } else {
324 self.protocol
326 .add_block(&self.src, Some(last_block.b), self.status.file_id)
327 .await?
328 .block
329 }
330 } else {
331 let extended_block = if let Some(block_writer) = self.block_writer.take() {
334 Some(block_writer.close().await?)
335 } else {
336 None
337 };
338
339 self.protocol
340 .add_block(&self.src, extended_block, self.status.file_id)
341 .await?
342 .block
343 };
344
345 let block_writer = BlockWriter::new(
346 Arc::clone(&self.protocol),
347 new_block,
348 self.protocol.get_cached_server_defaults().await?,
349 Arc::clone(&self.config),
350 self.handle.clone(),
351 self.status
352 .ec_policy
353 .as_ref()
354 .map(resolve_ec_policy)
355 .transpose()?
356 .as_ref(),
357 &self.src,
358 &self.status,
359 )
360 .await?;
361
362 self.block_writer = Some(block_writer);
363 Ok(())
364 }
365
366 async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
367 if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
369 self.create_block_writer().await?;
370 }
371
372 Ok(self.block_writer.as_mut().unwrap())
373 }
374
375 pub async fn write_bytes(&mut self, mut buf: Bytes) -> Result<usize> {
376 let bytes_to_write = buf.len();
377 while !buf.is_empty() {
378 let block_writer = self.get_block_writer().await?;
379
380 block_writer.write(&mut buf).await?;
381 }
382
383 self.bytes_written += bytes_to_write;
384
385 Ok(bytes_to_write)
386 }
387
388 pub async fn close(&mut self) -> Result<()> {
389 if !self.closed {
390 let extended_block = if let Some(block_writer) = self.block_writer.take() {
391 Some(block_writer.close().await?)
392 } else {
393 None
394 };
395
396 let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
397 let mut retries = 0;
398 while retries < COMPLETE_RETRIES {
399 let successful = self
400 .protocol
401 .complete(&self.src, extended_block.clone(), self.status.file_id)
402 .await?
403 .result;
404
405 if successful {
406 self.closed = true;
407 return Ok(());
408 }
409
410 let sleep = {
412 let _guard = self.handle.enter();
413 tokio::time::sleep(Duration::from_millis(retry_delay))
414 };
415 sleep.await;
416
417 retry_delay *= 2;
418 retries += 1;
419 }
420 Err(HdfsError::OperationFailed(
421 "Failed to complete file in time".to_string(),
422 ))
423 } else {
424 Ok(())
425 }
426 }
427}
428
429impl Drop for FileWriter {
430 fn drop(&mut self) {
431 if !self.closed {
432 warn!(
433 "FileWriter dropped without being closed. File content may not have saved or may not be complete"
434 );
435 }
436
437 self.protocol
438 .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
439 }
440}