hdfs_native/
file.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8
9use crate::common::config::Configuration;
10use crate::ec::{resolve_ec_policy, EcSchema};
11use crate::hdfs::block_reader::get_block_stream;
12use crate::hdfs::block_writer::BlockWriter;
13use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
14use crate::proto::hdfs;
15use crate::{HdfsError, Result};
16
17const COMPLETE_RETRY_DELAY_MS: u64 = 500;
18const COMPLETE_RETRIES: u32 = 5;
19
20pub struct FileReader {
21    protocol: Arc<NamenodeProtocol>,
22    located_blocks: hdfs::LocatedBlocksProto,
23    ec_schema: Option<EcSchema>,
24    position: usize,
25}
26
27impl FileReader {
28    pub(crate) fn new(
29        protocol: Arc<NamenodeProtocol>,
30        located_blocks: hdfs::LocatedBlocksProto,
31        ec_schema: Option<EcSchema>,
32    ) -> Self {
33        Self {
34            protocol,
35            located_blocks,
36            ec_schema,
37            position: 0,
38        }
39    }
40
41    /// Returns the total size of the file
42    pub fn file_length(&self) -> usize {
43        self.located_blocks.file_length as usize
44    }
45
46    /// Returns the remaining bytes left based on the current cursor position.
47    pub fn remaining(&self) -> usize {
48        if self.position > self.file_length() {
49            0
50        } else {
51            self.file_length() - self.position
52        }
53    }
54
55    /// Sets the cursor to the position. Panics if the position is beyond the end of the file
56    pub fn seek(&mut self, pos: usize) {
57        if pos > self.file_length() {
58            panic!("Cannot seek beyond the end of a file");
59        }
60        self.position = pos;
61    }
62
63    /// Returns the current cursor position in the file
64    pub fn tell(&self) -> usize {
65        self.position
66    }
67
68    /// Read up to `len` bytes into a new [Bytes] object, advancing the internal position in the file.
69    /// An empty [Bytes] object will be returned if the end of the file has been reached.
70    pub async fn read(&mut self, len: usize) -> Result<Bytes> {
71        if self.position >= self.file_length() {
72            Ok(Bytes::new())
73        } else {
74            let offset = self.position;
75            self.position = usize::min(self.position + len, self.file_length());
76            self.read_range(offset, self.position - offset).await
77        }
78    }
79
80    /// Read up to `buf.len()` bytes into the provided slice, advancing the internal position in the file.
81    /// Returns the number of bytes that were read, or 0 if the end of the file has been reached.
82    pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
83        if self.position >= self.file_length() {
84            Ok(0)
85        } else {
86            let offset = self.position;
87            self.position = usize::min(self.position + buf.len(), self.file_length());
88            let read_bytes = self.position - offset;
89            self.read_range_buf(&mut buf[..read_bytes], offset).await?;
90            Ok(read_bytes)
91        }
92    }
93
94    /// Read up to `len` bytes starting at `offset` into a new [Bytes] object. The returned buffer
95    /// could be smaller than `len` if `offset + len` extends beyond the end of the file.
96    ///
97    /// Panics if the requested range is outside of the file
98    pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
99        let mut stream = self.read_range_stream(offset, len);
100        let mut buf = BytesMut::with_capacity(len);
101        while let Some(bytes) = stream.next().await.transpose()? {
102            buf.put(bytes);
103        }
104        Ok(buf.freeze())
105    }
106
107    /// Read file data into an existing buffer
108    ///
109    /// Panics if the requested range is outside of the file
110    pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
111        let mut stream = self.read_range_stream(offset, buf.len());
112        while let Some(bytes) = stream.next().await.transpose()? {
113            buf.put(bytes);
114        }
115
116        Ok(())
117    }
118
119    /// Return a stream of `Bytes` objects containing the content of the file
120    ///
121    /// Panics if the requested range is outside of the file
122    pub fn read_range_stream(
123        &self,
124        offset: usize,
125        len: usize,
126    ) -> impl Stream<Item = Result<Bytes>> {
127        if offset + len > self.file_length() {
128            panic!("Cannot read past end of the file");
129        }
130
131        let block_streams: Vec<BoxStream<Result<Bytes>>> = self
132            .located_blocks
133            .blocks
134            .iter()
135            .flat_map(move |block| {
136                let block_file_start = block.offset as usize;
137                let block_file_end = block_file_start + block.b.num_bytes() as usize;
138
139                if block_file_start < (offset + len) && block_file_end > offset {
140                    // We need to read this block
141                    let block_start = offset - usize::min(offset, block_file_start);
142                    let block_end = usize::min(offset + len, block_file_end) - block_file_start;
143                    Some(get_block_stream(
144                        Arc::clone(&self.protocol),
145                        block.clone(),
146                        block_start,
147                        block_end - block_start,
148                        self.ec_schema.clone(),
149                    ))
150                } else {
151                    // No data is needed from this block
152                    None
153                }
154            })
155            .collect();
156
157        stream::iter(block_streams).flatten()
158    }
159}
160
161pub struct FileWriter {
162    src: String,
163    protocol: Arc<NamenodeProtocol>,
164    status: hdfs::HdfsFileStatusProto,
165    config: Arc<Configuration>,
166    block_writer: Option<BlockWriter>,
167    last_block: Option<hdfs::LocatedBlockProto>,
168    closed: bool,
169    bytes_written: usize,
170}
171
172impl FileWriter {
173    pub(crate) fn new(
174        protocol: Arc<NamenodeProtocol>,
175        src: String,
176        status: hdfs::HdfsFileStatusProto,
177        config: Arc<Configuration>,
178        // Some for append, None for create
179        last_block: Option<hdfs::LocatedBlockProto>,
180    ) -> Self {
181        protocol.add_file_lease(status.file_id(), status.namespace.clone());
182        Self {
183            protocol,
184            src,
185            status,
186            config,
187            block_writer: None,
188            last_block,
189            closed: false,
190            bytes_written: 0,
191        }
192    }
193
194    async fn create_block_writer(&mut self) -> Result<()> {
195        let new_block = if let Some(last_block) = self.last_block.take() {
196            // Append operation on first write. Erasure code appends always just create a new block.
197            if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
198            {
199                // The last block isn't full, just write data to it
200                last_block
201            } else {
202                // The last block is full, so create a new block to write to
203                self.protocol
204                    .add_block(&self.src, Some(last_block.b), self.status.file_id)
205                    .await?
206                    .block
207            }
208        } else {
209            // Not appending to an existing block, just create a new one
210            // If there's an existing block writer, close it first
211            let extended_block = if let Some(block_writer) = self.block_writer.take() {
212                Some(block_writer.close().await?)
213            } else {
214                None
215            };
216
217            self.protocol
218                .add_block(&self.src, extended_block, self.status.file_id)
219                .await?
220                .block
221        };
222
223        let block_writer = BlockWriter::new(
224            Arc::clone(&self.protocol),
225            new_block,
226            self.protocol.get_cached_server_defaults().await?,
227            Arc::clone(&self.config),
228            self.status
229                .ec_policy
230                .as_ref()
231                .map(resolve_ec_policy)
232                .transpose()?
233                .as_ref(),
234            &self.src,
235            &self.status,
236        )
237        .await?;
238
239        self.block_writer = Some(block_writer);
240        Ok(())
241    }
242
243    async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
244        // If the current writer is full, or hasn't been created, create one
245        if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
246            self.create_block_writer().await?;
247        }
248
249        Ok(self.block_writer.as_mut().unwrap())
250    }
251
252    pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
253        let bytes_to_write = buf.len();
254        // Create a shallow copy of the bytes instance to mutate and track what's been read
255        while !buf.is_empty() {
256            let block_writer = self.get_block_writer().await?;
257
258            block_writer.write(&mut buf).await?;
259        }
260
261        self.bytes_written += bytes_to_write;
262
263        Ok(bytes_to_write)
264    }
265
266    pub async fn close(&mut self) -> Result<()> {
267        if !self.closed {
268            let extended_block = if let Some(block_writer) = self.block_writer.take() {
269                Some(block_writer.close().await?)
270            } else {
271                None
272            };
273
274            let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
275            let mut retries = 0;
276            while retries < COMPLETE_RETRIES {
277                let successful = self
278                    .protocol
279                    .complete(&self.src, extended_block.clone(), self.status.file_id)
280                    .await?
281                    .result;
282
283                if successful {
284                    self.closed = true;
285                    return Ok(());
286                }
287
288                tokio::time::sleep(Duration::from_millis(retry_delay)).await;
289
290                retry_delay *= 2;
291                retries += 1;
292            }
293            Err(HdfsError::OperationFailed(
294                "Failed to complete file in time".to_string(),
295            ))
296        } else {
297            Ok(())
298        }
299    }
300}
301
302impl Drop for FileWriter {
303    fn drop(&mut self) {
304        if !self.closed {
305            warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
306        }
307
308        self.protocol
309            .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
310    }
311}