Skip to main content

hdfs_native/
file.rs

1use std::io::{self, SeekFrom};
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::{BufMut, Bytes, BytesMut};
8use futures::stream::BoxStream;
9use futures::{Stream, StreamExt, stream};
10use log::warn;
11use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
12use tokio::runtime::Handle;
13
14use crate::common::config::Configuration;
15use crate::ec::{EcSchema, resolve_ec_policy};
16use crate::hdfs::block_reader::get_block_stream;
17use crate::hdfs::block_writer::BlockWriter;
18use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
19use crate::proto::hdfs;
20use crate::{HdfsError, Result};
21
22const COMPLETE_RETRY_DELAY_MS: u64 = 500;
23const COMPLETE_RETRIES: u32 = 5;
24
25fn io_error(error: HdfsError) -> io::Error {
26    io::Error::other(error)
27}
28
29struct PendingRead {
30    stream: BoxStream<'static, Result<Bytes>>,
31    end_position: usize,
32}
33
34pub struct FileReader {
35    protocol: Arc<NamenodeProtocol>,
36    located_blocks: hdfs::LocatedBlocksProto,
37    ec_schema: Option<EcSchema>,
38    position: usize,
39    config: Arc<Configuration>,
40    handle: Handle,
41    pending_read: Option<std::sync::Mutex<PendingRead>>,
42}
43
44impl FileReader {
45    pub(crate) fn new(
46        protocol: Arc<NamenodeProtocol>,
47        located_blocks: hdfs::LocatedBlocksProto,
48        ec_schema: Option<EcSchema>,
49        config: Arc<Configuration>,
50        handle: Handle,
51    ) -> Self {
52        Self {
53            protocol,
54            located_blocks,
55            ec_schema,
56            position: 0,
57            config,
58            handle,
59            pending_read: None,
60        }
61    }
62
63    /// Returns the total size of the file
64    pub fn file_length(&self) -> usize {
65        self.located_blocks.file_length as usize
66    }
67
68    /// Returns the remaining bytes left based on the current cursor position.
69    pub fn remaining(&self) -> usize {
70        if self.position > self.file_length() {
71            0
72        } else {
73            self.file_length() - self.position
74        }
75    }
76
77    /// Sets the cursor to the position. Panics if the position is beyond the end of the file
78    pub fn set_position(&mut self, pos: usize) {
79        if pos > self.file_length() {
80            panic!("Cannot seek beyond the end of a file");
81        }
82        self.pending_read = None;
83        self.position = pos;
84    }
85
86    /// Returns the current cursor position in the file
87    pub fn tell(&self) -> usize {
88        self.position
89    }
90
91    /// Read up to `len` bytes into a new [Bytes] object, advancing the internal position in the file.
92    /// An empty [Bytes] object will be returned if the end of the file has been reached.
93    pub async fn read_bytes(&mut self, len: usize) -> Result<Bytes> {
94        self.pending_read = None;
95        if self.position >= self.file_length() {
96            Ok(Bytes::new())
97        } else {
98            let offset = self.position;
99            self.position = usize::min(self.position + len, self.file_length());
100            self.read_range(offset, self.position - offset).await
101        }
102    }
103
104    /// Read up to `buf.len()` bytes into the provided slice, advancing the internal position in the file.
105    /// Returns the number of bytes that were read, or 0 if the end of the file has been reached.
106    pub async fn read_into(&mut self, buf: &mut [u8]) -> Result<usize> {
107        self.pending_read = None;
108        if self.position >= self.file_length() {
109            Ok(0)
110        } else {
111            let offset = self.position;
112            self.position = usize::min(self.position + buf.len(), self.file_length());
113            let read_bytes = self.position - offset;
114            self.read_range_buf(&mut buf[..read_bytes], offset).await?;
115            Ok(read_bytes)
116        }
117    }
118
119    /// Read up to `len` bytes starting at `offset` into a new [Bytes] object. The returned buffer
120    /// could be smaller than `len` if `offset + len` extends beyond the end of the file.
121    ///
122    /// Panics if the requested range is outside of the file
123    pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
124        let mut stream = self.read_range_stream(offset, len);
125        let mut buf = BytesMut::with_capacity(len);
126        while let Some(bytes) = stream.next().await.transpose()? {
127            buf.put(bytes);
128        }
129        Ok(buf.freeze())
130    }
131
132    /// Read file data into an existing buffer
133    ///
134    /// Panics if the requested range is outside of the file
135    pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
136        let mut stream = self.read_range_stream(offset, buf.len());
137        while let Some(bytes) = stream.next().await.transpose()? {
138            buf.put(bytes);
139        }
140
141        Ok(())
142    }
143
144    /// Return a stream of `Bytes` objects containing the content of the file
145    ///
146    /// Panics if the requested range is outside of the file
147    pub fn read_range_stream(
148        &self,
149        offset: usize,
150        len: usize,
151    ) -> impl Stream<Item = Result<Bytes>> + use<> {
152        if offset + len > self.file_length() {
153            panic!("Cannot read past end of the file");
154        }
155
156        let block_streams: Vec<BoxStream<Result<Bytes>>> = self
157            .located_blocks
158            .blocks
159            .iter()
160            .flat_map(move |block| {
161                let block_file_start = block.offset as usize;
162                let block_file_end = block_file_start + block.b.num_bytes() as usize;
163
164                if block_file_start < (offset + len) && block_file_end > offset {
165                    // We need to read this block
166                    let block_start = offset - usize::min(offset, block_file_start);
167                    let block_end = usize::min(offset + len, block_file_end) - block_file_start;
168                    Some(get_block_stream(
169                        Arc::clone(&self.protocol),
170                        block.clone(),
171                        block_start,
172                        block_end - block_start,
173                        self.ec_schema.clone(),
174                        Arc::clone(&self.config),
175                        self.handle.clone(),
176                    ))
177                } else {
178                    // No data is needed from this block
179                    None
180                }
181            })
182            .collect();
183
184        stream::iter(block_streams).flatten()
185    }
186}
187
188impl AsyncRead for FileReader {
189    fn poll_read(
190        mut self: Pin<&mut Self>,
191        cx: &mut Context<'_>,
192        buf: &mut ReadBuf<'_>,
193    ) -> Poll<io::Result<()>> {
194        if buf.remaining() == 0 {
195            return Poll::Ready(Ok(()));
196        }
197
198        let starting_len = buf.filled().len();
199
200        loop {
201            if self.pending_read.is_none() {
202                if self.position >= self.file_length() {
203                    return Poll::Ready(Ok(()));
204                }
205
206                let offset = self.position;
207                let len = usize::min(buf.remaining(), self.file_length() - self.position);
208                let stream = self.read_range_stream(offset, len).boxed();
209                self.pending_read = Some(std::sync::Mutex::new(PendingRead {
210                    stream,
211                    end_position: offset + len,
212                }));
213            }
214
215            let poll_result = {
216                let mut pending = self.pending_read.as_ref().unwrap().lock().unwrap();
217                Pin::new(&mut pending.stream).poll_next(cx)
218            };
219
220            match poll_result {
221                Poll::Ready(Some(Ok(bytes))) => {
222                    self.position += bytes.len();
223                    buf.put_slice(&bytes);
224
225                    if self.pending_read.as_ref().is_some_and(|pending| {
226                        self.position >= pending.lock().unwrap().end_position
227                    }) {
228                        self.pending_read = None;
229                    }
230
231                    if buf.remaining() == 0 {
232                        return Poll::Ready(Ok(()));
233                    }
234                }
235                Poll::Ready(Some(Err(error))) => {
236                    self.pending_read = None;
237                    return Poll::Ready(Err(io_error(error)));
238                }
239                Poll::Ready(None) => {
240                    self.pending_read = None;
241                    return Poll::Ready(Ok(()));
242                }
243                Poll::Pending => {
244                    if buf.filled().len() > starting_len {
245                        return Poll::Ready(Ok(()));
246                    }
247                    return Poll::Pending;
248                }
249            }
250        }
251    }
252}
253
254impl AsyncSeek for FileReader {
255    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
256        let file_length = self.file_length() as i128;
257        let current = self.tell() as i128;
258        let new_pos = match position {
259            SeekFrom::Start(pos) => i128::from(pos),
260            SeekFrom::End(offset) => file_length + i128::from(offset),
261            SeekFrom::Current(offset) => current + i128::from(offset),
262        };
263
264        if new_pos < 0 || new_pos > file_length {
265            return Err(io::Error::new(
266                io::ErrorKind::InvalidInput,
267                "cannot seek outside of file bounds",
268            ));
269        }
270
271        self.set_position(new_pos as usize);
272        Ok(())
273    }
274
275    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
276        Poll::Ready(Ok(self.tell() as u64))
277    }
278}
279
280pub struct FileWriter {
281    src: String,
282    protocol: Arc<NamenodeProtocol>,
283    status: hdfs::HdfsFileStatusProto,
284    config: Arc<Configuration>,
285    handle: Handle,
286    block_writer: Option<BlockWriter>,
287    last_block: Option<hdfs::LocatedBlockProto>,
288    closed: bool,
289    bytes_written: usize,
290}
291
292impl FileWriter {
293    pub(crate) fn new(
294        protocol: Arc<NamenodeProtocol>,
295        src: String,
296        status: hdfs::HdfsFileStatusProto,
297        config: Arc<Configuration>,
298        handle: Handle,
299        // Some for append, None for create
300        last_block: Option<hdfs::LocatedBlockProto>,
301    ) -> Self {
302        protocol.add_file_lease(status.file_id(), status.namespace.clone());
303        Self {
304            protocol,
305            src,
306            status,
307            config,
308            handle,
309            block_writer: None,
310            last_block,
311            closed: false,
312            bytes_written: 0,
313        }
314    }
315
316    async fn create_block_writer(&mut self) -> Result<()> {
317        let new_block = if let Some(last_block) = self.last_block.take() {
318            // Append operation on first write. Erasure code appends always just create a new block.
319            if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
320            {
321                // The last block isn't full, just write data to it
322                last_block
323            } else {
324                // The last block is full, so create a new block to write to
325                self.protocol
326                    .add_block(&self.src, Some(last_block.b), self.status.file_id)
327                    .await?
328                    .block
329            }
330        } else {
331            // Not appending to an existing block, just create a new one
332            // If there's an existing block writer, close it first
333            let extended_block = if let Some(block_writer) = self.block_writer.take() {
334                Some(block_writer.close().await?)
335            } else {
336                None
337            };
338
339            self.protocol
340                .add_block(&self.src, extended_block, self.status.file_id)
341                .await?
342                .block
343        };
344
345        let block_writer = BlockWriter::new(
346            Arc::clone(&self.protocol),
347            new_block,
348            self.protocol.get_cached_server_defaults().await?,
349            Arc::clone(&self.config),
350            self.handle.clone(),
351            self.status
352                .ec_policy
353                .as_ref()
354                .map(resolve_ec_policy)
355                .transpose()?
356                .as_ref(),
357            &self.src,
358            &self.status,
359        )
360        .await?;
361
362        self.block_writer = Some(block_writer);
363        Ok(())
364    }
365
366    async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
367        // If the current writer is full, or hasn't been created, create one
368        if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
369            self.create_block_writer().await?;
370        }
371
372        Ok(self.block_writer.as_mut().unwrap())
373    }
374
375    pub async fn write_bytes(&mut self, mut buf: Bytes) -> Result<usize> {
376        let bytes_to_write = buf.len();
377        while !buf.is_empty() {
378            let block_writer = self.get_block_writer().await?;
379
380            block_writer.write(&mut buf).await?;
381        }
382
383        self.bytes_written += bytes_to_write;
384
385        Ok(bytes_to_write)
386    }
387
388    pub async fn close(&mut self) -> Result<()> {
389        if !self.closed {
390            let extended_block = if let Some(block_writer) = self.block_writer.take() {
391                Some(block_writer.close().await?)
392            } else {
393                None
394            };
395
396            let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
397            let mut retries = 0;
398            while retries < COMPLETE_RETRIES {
399                let successful = self
400                    .protocol
401                    .complete(&self.src, extended_block.clone(), self.status.file_id)
402                    .await?
403                    .result;
404
405                if successful {
406                    self.closed = true;
407                    return Ok(());
408                }
409
410                // Sleep in the provided runtime in case we are not called from a tokio runtime
411                let sleep = {
412                    let _guard = self.handle.enter();
413                    tokio::time::sleep(Duration::from_millis(retry_delay))
414                };
415                sleep.await;
416
417                retry_delay *= 2;
418                retries += 1;
419            }
420            Err(HdfsError::OperationFailed(
421                "Failed to complete file in time".to_string(),
422            ))
423        } else {
424            Ok(())
425        }
426    }
427}
428
429impl Drop for FileWriter {
430    fn drop(&mut self) {
431        if !self.closed {
432            warn!(
433                "FileWriter dropped without being closed. File content may not have saved or may not be complete"
434            );
435        }
436
437        self.protocol
438            .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
439    }
440}