hdfs_client/fs/
writer.rs

1use std::{
2    io::{self, Read, Write},
3    path::Path,
4    sync::Arc,
5};
6
7use hdfs_types::hdfs::{
8    AddBlockRequestProto, AppendRequestProto, ChecksumTypeProto, CompleteRequestProto,
9    CreateRequestProto, DatanodeIdProto, ExtendedBlockProto, FsPermissionProto,
10    FsServerDefaultsProto, GetServerDefaultsRequestProto, HdfsFileStatusProto,
11};
12
13use crate::{hrpc::HRpc, HDFSError, IOType, HDFS};
14
15use crate::data_transfer::BlockWriteStream;
16
17pub struct FileWriter<S: Read + Write, D: Read + Write> {
18    append: bool,
19    written: u64,
20    block_size: u64,
21    ipc: HRpc<S>,
22    connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D>>,
23    fs: HdfsFileStatusProto,
24    default: FsServerDefaultsProto,
25    client_name: String,
26    blk_stream: BlockWriteStream<D>,
27    path: String,
28}
29
30impl<S: Read + Write, D: Read + Write> FileWriter<S, D> {
31    pub fn options() -> WriterOptions {
32        WriterOptions::default()
33    }
34}
35
36impl<S: Read + Write, D: Read + Write> FileWriter<S, D> {
37    pub fn close(mut self) -> Result<(), HDFSError> {
38        let b = self.blk_stream.close(&mut self.ipc)?;
39        let req = CompleteRequestProto {
40            src: self.path.clone(),
41            client_name: self.client_name.clone(),
42            last: Some(b),
43            file_id: self.fs.file_id,
44        };
45        self.ipc.complete(req)?;
46        Ok(())
47    }
48}
49
50impl<S: Read + Write, D: Read + Write> Write for FileWriter<S, D> {
51    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
52        let offset = self.blk_stream.offset;
53        if offset + buf.len() as u64 >= self.block_size {
54            let split_idx = (self.block_size - offset) as usize;
55            let left = &buf[..split_idx];
56            self.blk_stream.write(left, false)?;
57            let mut prev = self.blk_stream.close(&mut self.ipc)?;
58            let remain = &buf[split_idx..];
59            let parts = remain.len().div_ceil(self.block_size as usize);
60            for (idx, chunk) in remain.chunks(self.block_size as usize).enumerate() {
61                let is_last = idx + 1 == parts;
62                let mut blk = create_blk(
63                    &mut self.ipc,
64                    self.client_name.clone(),
65                    self.path.clone(),
66                    &self.fs,
67                    self.connect_data_node.clone(),
68                    &self.default,
69                    Some(prev.clone()),
70                    if self.append {
71                        IOType::Append
72                    } else {
73                        IOType::Write
74                    },
75                )?;
76                blk.write(chunk, false)?;
77                if is_last {
78                    self.blk_stream = blk;
79                } else {
80                    prev = blk.close(&mut self.ipc)?;
81                }
82            }
83        } else {
84            self.blk_stream.write(buf, false)?;
85        }
86        self.written += buf.len() as u64;
87        Ok(buf.len())
88    }
89
90    fn flush(&mut self) -> io::Result<()> {
91        self.blk_stream.stream.flush()
92    }
93}
94
95#[derive(Debug, Default)]
96#[allow(unused)]
97pub struct WriterOptions {
98    pub replica: Option<u32>,
99    pub checksum: Option<ChecksumTypeProto>,
100    pub block_size: Option<u64>,
101    pub perm: Option<u32>,
102    pub unmask: Option<u32>,
103    pub over_ride: bool,
104}
105
106impl WriterOptions {
107    pub fn replica(self, replica: impl Into<Option<u32>>) -> Self {
108        Self {
109            replica: replica.into(),
110            ..self
111        }
112    }
113
114    pub fn checksum(self, checksum: impl Into<Option<ChecksumTypeProto>>) -> Self {
115        Self {
116            checksum: checksum.into(),
117            ..self
118        }
119    }
120
121    pub fn block_size(self, block_size: impl Into<Option<u64>>) -> Self {
122        Self {
123            block_size: block_size.into(),
124            ..self
125        }
126    }
127
128    pub fn append<S: Read + Write, D: Read + Write>(
129        self,
130        path: impl AsRef<Path>,
131        fs: &mut HDFS<S, D>,
132    ) -> Result<FileWriter<S, D>, HDFSError> {
133        let (_, default) = fs
134            .ipc
135            .get_server_defaults(GetServerDefaultsRequestProto {})?;
136        let default = default.server_defaults;
137        let path = path.as_ref().to_string_lossy().to_string();
138        let req = AppendRequestProto {
139            src: path.clone(),
140            client_name: fs.client_name.clone(),
141            ..Default::default()
142        };
143        let (_, resp) = fs.ipc.append(req)?;
144        let fs_status = resp
145            .stat
146            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no fs status in append resp"))?;
147        let blk_stream = match resp.block {
148            Some(block) => {
149                let stream = block.locs.iter().enumerate().find_map(|(idx, loc)| {
150                    match (fs.connect_data_node)(&loc.id, IOType::Append) {
151                        Ok(stream) => Some(stream),
152                        Err(e) => {
153                            tracing::info!(
154                                "try {} location of block {} failed {e}",
155                                idx + 1,
156                                block.b.block_id
157                            );
158                            None
159                        }
160                    }
161                });
162                let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
163                let offset = block.b.num_bytes();
164                BlockWriteStream::create(
165                    fs.client_name.clone(),
166                    stream,
167                    block,
168                    default.bytes_per_checksum,
169                    default.checksum_type(),
170                    offset,
171                    true,
172                )?
173            }
174            None => create_blk(
175                &mut fs.ipc,
176                fs.client_name.clone(),
177                path.clone(),
178                &fs_status,
179                fs.connect_data_node.clone(),
180                &default,
181                None,
182                IOType::Write,
183            )?,
184        };
185
186        Ok(FileWriter {
187            append: true,
188            written: 0,
189            block_size: self.block_size.unwrap_or(default.block_size),
190            ipc: (fs.create_ipc)()?,
191            connect_data_node: fs.connect_data_node.clone(),
192            client_name: fs.client_name.clone(),
193            fs: fs_status,
194            default,
195            blk_stream,
196            path,
197        })
198    }
199
200    pub fn create<S: Read + Write, D: Read + Write>(
201        self,
202        path: impl AsRef<Path>,
203        fs: &mut HDFS<S, D>,
204    ) -> Result<FileWriter<S, D>, HDFSError> {
205        let (_, default) = fs
206            .ipc
207            .get_server_defaults(GetServerDefaultsRequestProto {})?;
208        let default = default.server_defaults;
209        let path = path.as_ref().to_string_lossy().to_string();
210        let req = CreateRequestProto {
211            src: path.clone(),
212            masked: FsPermissionProto {
213                perm: self.perm.unwrap_or(0o644),
214            },
215            unmasked: self.unmask.map(|u| FsPermissionProto { perm: u }),
216            client_name: fs.client_name.clone(),
217            create_flag: 1,
218            create_parent: false,
219            replication: self.replica.unwrap_or(default.replication),
220            block_size: self.block_size.unwrap_or(default.block_size),
221            ..Default::default()
222        };
223        let (_, resp) = fs.ipc.create(req)?;
224        let fs_status = resp.fs.ok_or_else(|| HDFSError::EmptyFS)?;
225
226        let active_blk = create_blk(
227            &mut fs.ipc,
228            fs.client_name.clone(),
229            path.clone(),
230            &fs_status,
231            fs.connect_data_node.clone(),
232            &default,
233            None,
234            IOType::Write,
235        )?;
236
237        Ok(FileWriter {
238            append: false,
239            written: 0,
240            block_size: self.block_size.unwrap_or(default.block_size),
241            ipc: (fs.create_ipc)()?,
242            connect_data_node: fs.connect_data_node.clone(),
243            client_name: fs.client_name.clone(),
244            fs: fs_status,
245            default,
246            blk_stream: active_blk,
247            path,
248        })
249    }
250}
251
252#[allow(clippy::too_many_arguments)]
253fn create_blk<S: Read + Write, D: Read + Write>(
254    ipc: &mut HRpc<S>,
255    client_name: String,
256    path: String,
257    fs_status: &HdfsFileStatusProto,
258    conn_fn: Arc<dyn Fn(&DatanodeIdProto, IOType) -> Result<D, io::Error>>,
259    default: &FsServerDefaultsProto,
260    previous: Option<ExtendedBlockProto>,
261    io_ty: IOType,
262) -> Result<BlockWriteStream<D>, HDFSError> {
263    let req = AddBlockRequestProto {
264        src: path.clone(),
265        client_name: client_name.clone(),
266        previous,
267        file_id: fs_status.file_id,
268        ..Default::default()
269    };
270    let (_, resp) = ipc.add_block(req)?;
271    let new_blk = resp.block;
272    let stream =
273        new_blk
274            .locs
275            .iter()
276            .enumerate()
277            .find_map(|(idx, loc)| match conn_fn(&loc.id, io_ty) {
278                Ok(stream) => Some(stream),
279                Err(e) => {
280                    tracing::info!(
281                        "try {} location of block {} failed {e}",
282                        idx + 1,
283                        new_blk.b.block_id
284                    );
285                    None
286                }
287            });
288    let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
289    let blk_stream = BlockWriteStream::create(
290        client_name.clone(),
291        stream,
292        new_blk,
293        default.bytes_per_checksum,
294        default.checksum_type(),
295        0,
296        matches!(io_ty, IOType::Append),
297    )?;
298    Ok(blk_stream)
299}
300
301impl<S: Read + Write, D: Read + Write> Drop for FileWriter<S, D> {
302    fn drop(&mut self) {
303        if let Ok(b) = self.blk_stream.close(&mut self.ipc) {
304            let req = CompleteRequestProto {
305                src: self.path.clone(),
306                client_name: self.client_name.clone(),
307                last: Some(b),
308                file_id: self.fs.file_id,
309            };
310            self.ipc.complete(req).ok();
311        }
312    }
313}