hdfs_native/
file.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8use tokio::runtime::Handle;
9
10use crate::common::config::Configuration;
11use crate::ec::{resolve_ec_policy, EcSchema};
12use crate::hdfs::block_reader::get_block_stream;
13use crate::hdfs::block_writer::BlockWriter;
14use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
15use crate::proto::hdfs;
16use crate::{HdfsError, Result};
17
18const COMPLETE_RETRY_DELAY_MS: u64 = 500;
19const COMPLETE_RETRIES: u32 = 5;
20
21pub struct FileReader {
22    protocol: Arc<NamenodeProtocol>,
23    located_blocks: hdfs::LocatedBlocksProto,
24    ec_schema: Option<EcSchema>,
25    position: usize,
26    config: Arc<Configuration>,
27    handle: Handle,
28}
29
30impl FileReader {
31    pub(crate) fn new(
32        protocol: Arc<NamenodeProtocol>,
33        located_blocks: hdfs::LocatedBlocksProto,
34        ec_schema: Option<EcSchema>,
35        config: Arc<Configuration>,
36        handle: Handle,
37    ) -> Self {
38        Self {
39            protocol,
40            located_blocks,
41            ec_schema,
42            position: 0,
43            config,
44            handle,
45        }
46    }
47
48    /// Returns the total size of the file
49    pub fn file_length(&self) -> usize {
50        self.located_blocks.file_length as usize
51    }
52
53    /// Returns the remaining bytes left based on the current cursor position.
54    pub fn remaining(&self) -> usize {
55        if self.position > self.file_length() {
56            0
57        } else {
58            self.file_length() - self.position
59        }
60    }
61
62    /// Sets the cursor to the position. Panics if the position is beyond the end of the file
63    pub fn seek(&mut self, pos: usize) {
64        if pos > self.file_length() {
65            panic!("Cannot seek beyond the end of a file");
66        }
67        self.position = pos;
68    }
69
70    /// Returns the current cursor position in the file
71    pub fn tell(&self) -> usize {
72        self.position
73    }
74
75    /// Read up to `len` bytes into a new [Bytes] object, advancing the internal position in the file.
76    /// An empty [Bytes] object will be returned if the end of the file has been reached.
77    pub async fn read(&mut self, len: usize) -> Result<Bytes> {
78        if self.position >= self.file_length() {
79            Ok(Bytes::new())
80        } else {
81            let offset = self.position;
82            self.position = usize::min(self.position + len, self.file_length());
83            self.read_range(offset, self.position - offset).await
84        }
85    }
86
87    /// Read up to `buf.len()` bytes into the provided slice, advancing the internal position in the file.
88    /// Returns the number of bytes that were read, or 0 if the end of the file has been reached.
89    pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
90        if self.position >= self.file_length() {
91            Ok(0)
92        } else {
93            let offset = self.position;
94            self.position = usize::min(self.position + buf.len(), self.file_length());
95            let read_bytes = self.position - offset;
96            self.read_range_buf(&mut buf[..read_bytes], offset).await?;
97            Ok(read_bytes)
98        }
99    }
100
101    /// Read up to `len` bytes starting at `offset` into a new [Bytes] object. The returned buffer
102    /// could be smaller than `len` if `offset + len` extends beyond the end of the file.
103    ///
104    /// Panics if the requested range is outside of the file
105    pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
106        let mut stream = self.read_range_stream(offset, len);
107        let mut buf = BytesMut::with_capacity(len);
108        while let Some(bytes) = stream.next().await.transpose()? {
109            buf.put(bytes);
110        }
111        Ok(buf.freeze())
112    }
113
114    /// Read file data into an existing buffer
115    ///
116    /// Panics if the requested range is outside of the file
117    pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
118        let mut stream = self.read_range_stream(offset, buf.len());
119        while let Some(bytes) = stream.next().await.transpose()? {
120            buf.put(bytes);
121        }
122
123        Ok(())
124    }
125
126    /// Return a stream of `Bytes` objects containing the content of the file
127    ///
128    /// Panics if the requested range is outside of the file
129    pub fn read_range_stream(
130        &self,
131        offset: usize,
132        len: usize,
133    ) -> impl Stream<Item = Result<Bytes>> {
134        if offset + len > self.file_length() {
135            panic!("Cannot read past end of the file");
136        }
137
138        let block_streams: Vec<BoxStream<Result<Bytes>>> = self
139            .located_blocks
140            .blocks
141            .iter()
142            .flat_map(move |block| {
143                let block_file_start = block.offset as usize;
144                let block_file_end = block_file_start + block.b.num_bytes() as usize;
145
146                if block_file_start < (offset + len) && block_file_end > offset {
147                    // We need to read this block
148                    let block_start = offset - usize::min(offset, block_file_start);
149                    let block_end = usize::min(offset + len, block_file_end) - block_file_start;
150                    Some(get_block_stream(
151                        Arc::clone(&self.protocol),
152                        block.clone(),
153                        block_start,
154                        block_end - block_start,
155                        self.ec_schema.clone(),
156                        Arc::clone(&self.config),
157                        self.handle.clone(),
158                    ))
159                } else {
160                    // No data is needed from this block
161                    None
162                }
163            })
164            .collect();
165
166        stream::iter(block_streams).flatten()
167    }
168}
169
170pub struct FileWriter {
171    src: String,
172    protocol: Arc<NamenodeProtocol>,
173    status: hdfs::HdfsFileStatusProto,
174    config: Arc<Configuration>,
175    handle: Handle,
176    block_writer: Option<BlockWriter>,
177    last_block: Option<hdfs::LocatedBlockProto>,
178    closed: bool,
179    bytes_written: usize,
180}
181
182impl FileWriter {
183    pub(crate) fn new(
184        protocol: Arc<NamenodeProtocol>,
185        src: String,
186        status: hdfs::HdfsFileStatusProto,
187        config: Arc<Configuration>,
188        handle: Handle,
189        // Some for append, None for create
190        last_block: Option<hdfs::LocatedBlockProto>,
191    ) -> Self {
192        protocol.add_file_lease(status.file_id(), status.namespace.clone());
193        Self {
194            protocol,
195            src,
196            status,
197            config,
198            handle,
199            block_writer: None,
200            last_block,
201            closed: false,
202            bytes_written: 0,
203        }
204    }
205
206    async fn create_block_writer(&mut self) -> Result<()> {
207        let new_block = if let Some(last_block) = self.last_block.take() {
208            // Append operation on first write. Erasure code appends always just create a new block.
209            if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
210            {
211                // The last block isn't full, just write data to it
212                last_block
213            } else {
214                // The last block is full, so create a new block to write to
215                self.protocol
216                    .add_block(&self.src, Some(last_block.b), self.status.file_id)
217                    .await?
218                    .block
219            }
220        } else {
221            // Not appending to an existing block, just create a new one
222            // If there's an existing block writer, close it first
223            let extended_block = if let Some(block_writer) = self.block_writer.take() {
224                Some(block_writer.close().await?)
225            } else {
226                None
227            };
228
229            self.protocol
230                .add_block(&self.src, extended_block, self.status.file_id)
231                .await?
232                .block
233        };
234
235        let block_writer = BlockWriter::new(
236            Arc::clone(&self.protocol),
237            new_block,
238            self.protocol.get_cached_server_defaults().await?,
239            Arc::clone(&self.config),
240            self.handle.clone(),
241            self.status
242                .ec_policy
243                .as_ref()
244                .map(resolve_ec_policy)
245                .transpose()?
246                .as_ref(),
247            &self.src,
248            &self.status,
249        )
250        .await?;
251
252        self.block_writer = Some(block_writer);
253        Ok(())
254    }
255
256    async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
257        // If the current writer is full, or hasn't been created, create one
258        if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
259            self.create_block_writer().await?;
260        }
261
262        Ok(self.block_writer.as_mut().unwrap())
263    }
264
265    pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
266        let bytes_to_write = buf.len();
267        // Create a shallow copy of the bytes instance to mutate and track what's been read
268        while !buf.is_empty() {
269            let block_writer = self.get_block_writer().await?;
270
271            block_writer.write(&mut buf).await?;
272        }
273
274        self.bytes_written += bytes_to_write;
275
276        Ok(bytes_to_write)
277    }
278
279    pub async fn close(&mut self) -> Result<()> {
280        if !self.closed {
281            let extended_block = if let Some(block_writer) = self.block_writer.take() {
282                Some(block_writer.close().await?)
283            } else {
284                None
285            };
286
287            let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
288            let mut retries = 0;
289            while retries < COMPLETE_RETRIES {
290                let successful = self
291                    .protocol
292                    .complete(&self.src, extended_block.clone(), self.status.file_id)
293                    .await?
294                    .result;
295
296                if successful {
297                    self.closed = true;
298                    return Ok(());
299                }
300
301                // Sleep in the provided runtime in case we are not called from a tokio runtime
302                let sleep = {
303                    let _guard = self.handle.enter();
304                    tokio::time::sleep(Duration::from_millis(retry_delay))
305                };
306                sleep.await;
307
308                retry_delay *= 2;
309                retries += 1;
310            }
311            Err(HdfsError::OperationFailed(
312                "Failed to complete file in time".to_string(),
313            ))
314        } else {
315            Ok(())
316        }
317    }
318}
319
320impl Drop for FileWriter {
321    fn drop(&mut self) {
322        if !self.closed {
323            warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
324        }
325
326        self.protocol
327            .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
328    }
329}