1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8
9use crate::common::config::Configuration;
10use crate::ec::{resolve_ec_policy, EcSchema};
11use crate::hdfs::block_reader::get_block_stream;
12use crate::hdfs::block_writer::BlockWriter;
13use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
14use crate::proto::hdfs;
15use crate::{HdfsError, Result};
16
17const COMPLETE_RETRY_DELAY_MS: u64 = 500;
18const COMPLETE_RETRIES: u32 = 5;
19
20pub struct FileReader {
21 protocol: Arc<NamenodeProtocol>,
22 located_blocks: hdfs::LocatedBlocksProto,
23 ec_schema: Option<EcSchema>,
24 position: usize,
25}
26
27impl FileReader {
28 pub(crate) fn new(
29 protocol: Arc<NamenodeProtocol>,
30 located_blocks: hdfs::LocatedBlocksProto,
31 ec_schema: Option<EcSchema>,
32 ) -> Self {
33 Self {
34 protocol,
35 located_blocks,
36 ec_schema,
37 position: 0,
38 }
39 }
40
41 pub fn file_length(&self) -> usize {
43 self.located_blocks.file_length as usize
44 }
45
46 pub fn remaining(&self) -> usize {
48 if self.position > self.file_length() {
49 0
50 } else {
51 self.file_length() - self.position
52 }
53 }
54
55 pub fn seek(&mut self, pos: usize) {
57 if pos > self.file_length() {
58 panic!("Cannot seek beyond the end of a file");
59 }
60 self.position = pos;
61 }
62
63 pub fn tell(&self) -> usize {
65 self.position
66 }
67
68 pub async fn read(&mut self, len: usize) -> Result<Bytes> {
71 if self.position >= self.file_length() {
72 Ok(Bytes::new())
73 } else {
74 let offset = self.position;
75 self.position = usize::min(self.position + len, self.file_length());
76 self.read_range(offset, self.position - offset).await
77 }
78 }
79
80 pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
83 if self.position >= self.file_length() {
84 Ok(0)
85 } else {
86 let offset = self.position;
87 self.position = usize::min(self.position + buf.len(), self.file_length());
88 let read_bytes = self.position - offset;
89 self.read_range_buf(&mut buf[..read_bytes], offset).await?;
90 Ok(read_bytes)
91 }
92 }
93
94 pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
99 let mut stream = self.read_range_stream(offset, len);
100 let mut buf = BytesMut::with_capacity(len);
101 while let Some(bytes) = stream.next().await.transpose()? {
102 buf.put(bytes);
103 }
104 Ok(buf.freeze())
105 }
106
107 pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
111 let mut stream = self.read_range_stream(offset, buf.len());
112 while let Some(bytes) = stream.next().await.transpose()? {
113 buf.put(bytes);
114 }
115
116 Ok(())
117 }
118
119 pub fn read_range_stream(
123 &self,
124 offset: usize,
125 len: usize,
126 ) -> impl Stream<Item = Result<Bytes>> {
127 if offset + len > self.file_length() {
128 panic!("Cannot read past end of the file");
129 }
130
131 let block_streams: Vec<BoxStream<Result<Bytes>>> = self
132 .located_blocks
133 .blocks
134 .iter()
135 .flat_map(move |block| {
136 let block_file_start = block.offset as usize;
137 let block_file_end = block_file_start + block.b.num_bytes() as usize;
138
139 if block_file_start < (offset + len) && block_file_end > offset {
140 let block_start = offset - usize::min(offset, block_file_start);
142 let block_end = usize::min(offset + len, block_file_end) - block_file_start;
143 Some(get_block_stream(
144 Arc::clone(&self.protocol),
145 block.clone(),
146 block_start,
147 block_end - block_start,
148 self.ec_schema.clone(),
149 ))
150 } else {
151 None
153 }
154 })
155 .collect();
156
157 stream::iter(block_streams).flatten()
158 }
159}
160
161pub struct FileWriter {
162 src: String,
163 protocol: Arc<NamenodeProtocol>,
164 status: hdfs::HdfsFileStatusProto,
165 config: Arc<Configuration>,
166 block_writer: Option<BlockWriter>,
167 last_block: Option<hdfs::LocatedBlockProto>,
168 closed: bool,
169 bytes_written: usize,
170}
171
172impl FileWriter {
173 pub(crate) fn new(
174 protocol: Arc<NamenodeProtocol>,
175 src: String,
176 status: hdfs::HdfsFileStatusProto,
177 config: Arc<Configuration>,
178 last_block: Option<hdfs::LocatedBlockProto>,
180 ) -> Self {
181 protocol.add_file_lease(status.file_id(), status.namespace.clone());
182 Self {
183 protocol,
184 src,
185 status,
186 config,
187 block_writer: None,
188 last_block,
189 closed: false,
190 bytes_written: 0,
191 }
192 }
193
194 async fn create_block_writer(&mut self) -> Result<()> {
195 let new_block = if let Some(last_block) = self.last_block.take() {
196 if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
198 {
199 last_block
201 } else {
202 self.protocol
204 .add_block(&self.src, Some(last_block.b), self.status.file_id)
205 .await?
206 .block
207 }
208 } else {
209 let extended_block = if let Some(block_writer) = self.block_writer.take() {
212 Some(block_writer.close().await?)
213 } else {
214 None
215 };
216
217 self.protocol
218 .add_block(&self.src, extended_block, self.status.file_id)
219 .await?
220 .block
221 };
222
223 let block_writer = BlockWriter::new(
224 Arc::clone(&self.protocol),
225 new_block,
226 self.protocol.get_cached_server_defaults().await?,
227 Arc::clone(&self.config),
228 self.status
229 .ec_policy
230 .as_ref()
231 .map(resolve_ec_policy)
232 .transpose()?
233 .as_ref(),
234 &self.src,
235 &self.status,
236 )
237 .await?;
238
239 self.block_writer = Some(block_writer);
240 Ok(())
241 }
242
243 async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
244 if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
246 self.create_block_writer().await?;
247 }
248
249 Ok(self.block_writer.as_mut().unwrap())
250 }
251
252 pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
253 let bytes_to_write = buf.len();
254 while !buf.is_empty() {
256 let block_writer = self.get_block_writer().await?;
257
258 block_writer.write(&mut buf).await?;
259 }
260
261 self.bytes_written += bytes_to_write;
262
263 Ok(bytes_to_write)
264 }
265
266 pub async fn close(&mut self) -> Result<()> {
267 if !self.closed {
268 let extended_block = if let Some(block_writer) = self.block_writer.take() {
269 Some(block_writer.close().await?)
270 } else {
271 None
272 };
273
274 let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
275 let mut retries = 0;
276 while retries < COMPLETE_RETRIES {
277 let successful = self
278 .protocol
279 .complete(&self.src, extended_block.clone(), self.status.file_id)
280 .await?
281 .result;
282
283 if successful {
284 self.closed = true;
285 return Ok(());
286 }
287
288 tokio::time::sleep(Duration::from_millis(retry_delay)).await;
289
290 retry_delay *= 2;
291 retries += 1;
292 }
293 Err(HdfsError::OperationFailed(
294 "Failed to complete file in time".to_string(),
295 ))
296 } else {
297 Ok(())
298 }
299 }
300}
301
302impl Drop for FileWriter {
303 fn drop(&mut self) {
304 if !self.closed {
305 warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
306 }
307
308 self.protocol
309 .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
310 }
311}