1use std::{
2 io::{self, Read, Write},
3 path::Path,
4 sync::Arc,
5};
6
7use hdfs_types::hdfs::{
8 AddBlockRequestProto, AppendRequestProto, ChecksumTypeProto, CompleteRequestProto,
9 CreateRequestProto, DatanodeIdProto, ExtendedBlockProto, FsPermissionProto,
10 FsServerDefaultsProto, GetServerDefaultsRequestProto, HdfsFileStatusProto,
11};
12
13use crate::{hrpc::HRpc, HDFSError, IOType, HDFS};
14
15use crate::data_transfer::BlockWriteStream;
16
17pub struct FileWriter<S: Read + Write, D: Read + Write> {
18 append: bool,
19 written: u64,
20 block_size: u64,
21 ipc: HRpc<S>,
22 connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D>>,
23 fs: HdfsFileStatusProto,
24 default: FsServerDefaultsProto,
25 client_name: String,
26 blk_stream: BlockWriteStream<D>,
27 path: String,
28}
29
30impl<S: Read + Write, D: Read + Write> FileWriter<S, D> {
31 pub fn options() -> WriterOptions {
32 WriterOptions::default()
33 }
34}
35
36impl<S: Read + Write, D: Read + Write> FileWriter<S, D> {
37 pub fn close(mut self) -> Result<(), HDFSError> {
38 let b = self.blk_stream.close(&mut self.ipc)?;
39 let req = CompleteRequestProto {
40 src: self.path.clone(),
41 client_name: self.client_name.clone(),
42 last: Some(b),
43 file_id: self.fs.file_id,
44 };
45 self.ipc.complete(req)?;
46 Ok(())
47 }
48}
49
50impl<S: Read + Write, D: Read + Write> Write for FileWriter<S, D> {
51 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
52 let offset = self.blk_stream.offset;
53 if offset + buf.len() as u64 >= self.block_size {
54 let split_idx = (self.block_size - offset) as usize;
55 let left = &buf[..split_idx];
56 self.blk_stream.write(left, false)?;
57 let mut prev = self.blk_stream.close(&mut self.ipc)?;
58 let remain = &buf[split_idx..];
59 let parts = remain.len().div_ceil(self.block_size as usize);
60 for (idx, chunk) in remain.chunks(self.block_size as usize).enumerate() {
61 let is_last = idx + 1 == parts;
62 let mut blk = create_blk(
63 &mut self.ipc,
64 self.client_name.clone(),
65 self.path.clone(),
66 &self.fs,
67 self.connect_data_node.clone(),
68 &self.default,
69 Some(prev.clone()),
70 if self.append {
71 IOType::Append
72 } else {
73 IOType::Write
74 },
75 )?;
76 blk.write(chunk, false)?;
77 if is_last {
78 self.blk_stream = blk;
79 } else {
80 prev = blk.close(&mut self.ipc)?;
81 }
82 }
83 } else {
84 self.blk_stream.write(buf, false)?;
85 }
86 self.written += buf.len() as u64;
87 Ok(buf.len())
88 }
89
90 fn flush(&mut self) -> io::Result<()> {
91 self.blk_stream.stream.flush()
92 }
93}
94
95#[derive(Debug, Default)]
96#[allow(unused)]
97pub struct WriterOptions {
98 pub replica: Option<u32>,
99 pub checksum: Option<ChecksumTypeProto>,
100 pub block_size: Option<u64>,
101 pub perm: Option<u32>,
102 pub unmask: Option<u32>,
103 pub over_ride: bool,
104}
105
106impl WriterOptions {
107 pub fn replica(self, replica: impl Into<Option<u32>>) -> Self {
108 Self {
109 replica: replica.into(),
110 ..self
111 }
112 }
113
114 pub fn checksum(self, checksum: impl Into<Option<ChecksumTypeProto>>) -> Self {
115 Self {
116 checksum: checksum.into(),
117 ..self
118 }
119 }
120
121 pub fn block_size(self, block_size: impl Into<Option<u64>>) -> Self {
122 Self {
123 block_size: block_size.into(),
124 ..self
125 }
126 }
127
128 pub fn append<S: Read + Write, D: Read + Write>(
129 self,
130 path: impl AsRef<Path>,
131 fs: &mut HDFS<S, D>,
132 ) -> Result<FileWriter<S, D>, HDFSError> {
133 let (_, default) = fs
134 .ipc
135 .get_server_defaults(GetServerDefaultsRequestProto {})?;
136 let default = default.server_defaults;
137 let path = path.as_ref().to_string_lossy().to_string();
138 let req = AppendRequestProto {
139 src: path.clone(),
140 client_name: fs.client_name.clone(),
141 ..Default::default()
142 };
143 let (_, resp) = fs.ipc.append(req)?;
144 let fs_status = resp
145 .stat
146 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no fs status in append resp"))?;
147 let blk_stream = match resp.block {
148 Some(block) => {
149 let stream = block.locs.iter().enumerate().find_map(|(idx, loc)| {
150 match (fs.connect_data_node)(&loc.id, IOType::Append) {
151 Ok(stream) => Some(stream),
152 Err(e) => {
153 tracing::info!(
154 "try {} location of block {} failed {e}",
155 idx + 1,
156 block.b.block_id
157 );
158 None
159 }
160 }
161 });
162 let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
163 let offset = block.b.num_bytes();
164 BlockWriteStream::create(
165 fs.client_name.clone(),
166 stream,
167 block,
168 default.bytes_per_checksum,
169 default.checksum_type(),
170 offset,
171 true,
172 )?
173 }
174 None => create_blk(
175 &mut fs.ipc,
176 fs.client_name.clone(),
177 path.clone(),
178 &fs_status,
179 fs.connect_data_node.clone(),
180 &default,
181 None,
182 IOType::Write,
183 )?,
184 };
185
186 Ok(FileWriter {
187 append: true,
188 written: 0,
189 block_size: self.block_size.unwrap_or(default.block_size),
190 ipc: (fs.create_ipc)()?,
191 connect_data_node: fs.connect_data_node.clone(),
192 client_name: fs.client_name.clone(),
193 fs: fs_status,
194 default,
195 blk_stream,
196 path,
197 })
198 }
199
200 pub fn create<S: Read + Write, D: Read + Write>(
201 self,
202 path: impl AsRef<Path>,
203 fs: &mut HDFS<S, D>,
204 ) -> Result<FileWriter<S, D>, HDFSError> {
205 let (_, default) = fs
206 .ipc
207 .get_server_defaults(GetServerDefaultsRequestProto {})?;
208 let default = default.server_defaults;
209 let path = path.as_ref().to_string_lossy().to_string();
210 let req = CreateRequestProto {
211 src: path.clone(),
212 masked: FsPermissionProto {
213 perm: self.perm.unwrap_or(0o644),
214 },
215 unmasked: self.unmask.map(|u| FsPermissionProto { perm: u }),
216 client_name: fs.client_name.clone(),
217 create_flag: 1,
218 create_parent: false,
219 replication: self.replica.unwrap_or(default.replication),
220 block_size: self.block_size.unwrap_or(default.block_size),
221 ..Default::default()
222 };
223 let (_, resp) = fs.ipc.create(req)?;
224 let fs_status = resp.fs.ok_or_else(|| HDFSError::EmptyFS)?;
225
226 let active_blk = create_blk(
227 &mut fs.ipc,
228 fs.client_name.clone(),
229 path.clone(),
230 &fs_status,
231 fs.connect_data_node.clone(),
232 &default,
233 None,
234 IOType::Write,
235 )?;
236
237 Ok(FileWriter {
238 append: false,
239 written: 0,
240 block_size: self.block_size.unwrap_or(default.block_size),
241 ipc: (fs.create_ipc)()?,
242 connect_data_node: fs.connect_data_node.clone(),
243 client_name: fs.client_name.clone(),
244 fs: fs_status,
245 default,
246 blk_stream: active_blk,
247 path,
248 })
249 }
250}
251
252#[allow(clippy::too_many_arguments)]
253fn create_blk<S: Read + Write, D: Read + Write>(
254 ipc: &mut HRpc<S>,
255 client_name: String,
256 path: String,
257 fs_status: &HdfsFileStatusProto,
258 conn_fn: Arc<dyn Fn(&DatanodeIdProto, IOType) -> Result<D, io::Error>>,
259 default: &FsServerDefaultsProto,
260 previous: Option<ExtendedBlockProto>,
261 io_ty: IOType,
262) -> Result<BlockWriteStream<D>, HDFSError> {
263 let req = AddBlockRequestProto {
264 src: path.clone(),
265 client_name: client_name.clone(),
266 previous,
267 file_id: fs_status.file_id,
268 ..Default::default()
269 };
270 let (_, resp) = ipc.add_block(req)?;
271 let new_blk = resp.block;
272 let stream =
273 new_blk
274 .locs
275 .iter()
276 .enumerate()
277 .find_map(|(idx, loc)| match conn_fn(&loc.id, io_ty) {
278 Ok(stream) => Some(stream),
279 Err(e) => {
280 tracing::info!(
281 "try {} location of block {} failed {e}",
282 idx + 1,
283 new_blk.b.block_id
284 );
285 None
286 }
287 });
288 let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
289 let blk_stream = BlockWriteStream::create(
290 client_name.clone(),
291 stream,
292 new_blk,
293 default.bytes_per_checksum,
294 default.checksum_type(),
295 0,
296 matches!(io_ty, IOType::Append),
297 )?;
298 Ok(blk_stream)
299}
300
301impl<S: Read + Write, D: Read + Write> Drop for FileWriter<S, D> {
302 fn drop(&mut self) {
303 if let Ok(b) = self.blk_stream.close(&mut self.ipc) {
304 let req = CompleteRequestProto {
305 src: self.path.clone(),
306 client_name: self.client_name.clone(),
307 last: Some(b),
308 file_id: self.fs.file_id,
309 };
310 self.ipc.complete(req).ok();
311 }
312 }
313}