1use std::str::FromStr as _;
2
3use mountpoint_s3_client::ObjectClient;
4use mountpoint_s3_client::types::ETag;
5use tracing::{debug, error};
6
7use crate::fs::InodeError;
8use crate::metablock::{Lookup, Metablock, NewHandle, PendingUploadHook, ReadWriteMode, S3Location};
9use crate::object::ObjectId;
10use crate::prefetch::{HandleId, PrefetchGetObject};
11use crate::sync::{Arc, AsyncMutex};
12use crate::upload::{AppendUploadRequest, UploadRequest};
13
14use super::{Error, InodeNo, OpenFlags, S3Filesystem, ToErrno};
15
16#[derive(Debug)]
17pub struct FileHandle<Client>
18where
19 Client: ObjectClient + Clone + Send + Sync + 'static,
20{
21 pub ino: InodeNo,
22 pub location: S3Location,
23 pub state: AsyncMutex<FileHandleState<Client>>,
24 pub open_pid: u32,
26}
27
28impl<Client> FileHandle<Client>
29where
30 Client: ObjectClient + Clone + Send + Sync + 'static,
31{
32 pub fn file_name(&self) -> &str {
33 self.location.name()
34 }
35}
36
37#[derive(Debug)]
38pub enum FileHandleState<Client>
39where
40 Client: ObjectClient + Clone + Send + Sync + 'static,
41{
42 Read {
44 request: PrefetchGetObject<Client>,
45 flushed: bool,
47 },
48 Write {
50 state: UploadState<Client>,
51 flushed: bool,
53 },
54}
55
56impl<Client> FileHandleState<Client>
57where
58 Client: ObjectClient + Clone + Send + Sync,
59{
60 pub async fn new(
61 fh: u64,
62 handle: &NewHandle,
63 flags: OpenFlags,
64 fs: &S3Filesystem<Client>,
65 ) -> Result<FileHandleState<Client>, Error> {
66 let ino = handle.lookup.ino();
67 let stat = handle.lookup.stat();
68 let location = handle.lookup.s3_location()?;
69 let full_key = location.full_key();
70 let bucket = location.bucket_name();
71
72 match handle.mode {
73 ReadWriteMode::Read => {
74 let object_size = stat.size as u64;
75 let etag = match &stat.etag {
76 None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", ino)),
77 Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
78 };
79 let object_id = ObjectId::new(full_key.into(), etag);
80 let request = fs
81 .prefetcher
82 .prefetch(bucket.to_string(), object_id, HandleId::new(fh), object_size);
83 let handle = FileHandleState::Read {
84 request,
85 flushed: false,
86 };
87 metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
88 Ok(handle)
89 }
90 ReadWriteMode::Write => {
91 let is_truncate = flags.contains(OpenFlags::O_TRUNC);
92 let write_mode = fs.config.write_mode();
93
94 let upload_state = if write_mode.incremental_upload {
95 let initial_etag = if is_truncate {
96 None
97 } else {
98 stat.etag.as_ref().map(|e| e.into())
99 };
100 let current_offset = if is_truncate { 0 } else { stat.size as u64 };
101 let request = fs.uploader.start_incremental_upload(
102 bucket.to_string(),
103 full_key.into(),
104 current_offset,
105 initial_etag.clone(),
106 );
107 UploadState::AppendInProgress {
108 request,
109 initial_etag,
110 written_bytes: 0,
111 }
112 } else {
113 let request = fs
114 .uploader
115 .start_atomic_upload(bucket.to_string(), full_key.into())
116 .map_err(|e| err!(libc::EIO, source:e, "put failed to start"))?;
117 UploadState::MPUInProgress { request }
118 };
119 let handle = FileHandleState::Write {
120 state: upload_state,
121 flushed: false,
122 };
123 metrics::gauge!("fs.current_handles", "type" => "write").increment(1.0);
124 Ok(handle)
125 }
126 }
127 }
128}
129
130#[derive(Debug)]
131pub enum UploadState<Client: ObjectClient + Send + Sync> {
132 AppendInProgress {
133 request: AppendUploadRequest<Client>,
134 initial_etag: Option<ETag>,
135 written_bytes: usize,
136 },
137 MPUInProgress {
138 request: UploadRequest<Client>,
139 },
140 Completed,
141 Failed(libc::c_int),
143}
144
145impl<Client> UploadState<Client>
146where
147 Client: ObjectClient + Send + Sync + Clone + 'static,
148{
149 pub async fn write(
150 &mut self,
151 fs: &S3Filesystem<Client>,
152 handle: &FileHandle<Client>,
153 offset: i64,
154 data: &[u8],
155 fh: u64,
156 ) -> Result<u32, Error> {
157 let result: Result<_, Error> = match self {
158 UploadState::AppendInProgress {
159 request, written_bytes, ..
160 } => match request.write(offset as u64, data).await {
161 Ok(len) => {
162 *written_bytes += len;
163 Ok(len)
164 }
165 Err(e) => Err(e.into()),
166 },
167 UploadState::MPUInProgress { request, .. } => match request.write(offset, data).await {
168 Ok(len) => Ok(len),
169 Err(e) => Err(e.into()),
170 },
171 UploadState::Completed => {
172 return Err(err!(libc::EIO, "upload already completed for key {}", handle.location));
173 }
174 UploadState::Failed(e) => {
175 return Err(err!(*e, "upload already aborted for key {}", handle.location));
176 }
177 };
178
179 match result {
180 Ok(len) => {
181 fs.metablock.inc_file_size(handle.ino, len).await?;
182 Ok(len as u32)
183 }
184 Err(e) => {
185 match std::mem::replace(self, UploadState::Failed(e.to_errno())) {
187 UploadState::MPUInProgress { .. } | UploadState::AppendInProgress { .. } => {
188 Self::finish_on_error(fs.metablock.clone(), handle.ino, &handle.location, fh).await;
189 }
190 UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
191 }
192 Err(e)
193 }
194 }
195 }
196
197 pub async fn commit(
200 &mut self,
201 fs: &S3Filesystem<Client>,
202 handle: Arc<FileHandle<Client>>,
203 fh: u64,
204 ) -> Result<(), Error> {
205 match &self {
206 UploadState::Completed => return Ok(()),
207 UploadState::Failed(e) => {
208 return Err(err!(*e, "upload already aborted for key {}", handle.location));
209 }
210 _ => {}
211 };
212
213 match std::mem::replace(self, UploadState::Completed) {
214 UploadState::AppendInProgress {
215 request,
216 initial_etag,
217 written_bytes,
218 } => {
219 let current_offset = request.current_offset();
220 let etag = Self::commit_append(request, &handle.location)
221 .await
222 .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?;
223
224 let initial_etag = etag.or(initial_etag);
226 let request = fs.uploader.start_incremental_upload(
227 handle.location.bucket_name().to_owned(),
228 handle.location.full_key().to_string(),
229 current_offset,
230 initial_etag.clone(),
231 );
232 *self = UploadState::AppendInProgress {
233 request,
234 initial_etag: initial_etag.clone(),
235 written_bytes,
236 };
237 }
238 UploadState::MPUInProgress { request, .. } => {
239 Self::complete_upload(fs.metablock.clone(), handle.ino, &handle.location, request, fh)
240 .await
241 .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?;
242 }
243 UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
244 }
245 Ok(())
246 }
247
248 pub async fn complete(
252 &mut self,
253 fs: &S3Filesystem<Client>,
254 handle: Arc<FileHandle<Client>>,
255 pid: u32,
256 open_pid: u32,
257 fh: u64,
258 ) -> Result<(), Error> {
259 match self {
260 UploadState::AppendInProgress { written_bytes, .. } => {
261 if *written_bytes == 0 || !are_from_same_process(open_pid, pid) {
262 self.commit(fs, handle.clone(), fh).await?;
264 return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
265 }
266 }
267 UploadState::MPUInProgress { request, .. } => {
268 if request.size() == 0 {
269 debug!(key=%handle.location, "not completing upload because nothing was written yet");
270 return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
271 }
272 if !are_from_same_process(open_pid, pid) {
273 debug!(
274 key=%handle.location,
275 pid, open_pid, "not completing upload because current PID differs from PID at open",
276 );
277 return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
278 }
279 }
280 UploadState::Completed => return Ok(()),
281 UploadState::Failed(e) => {
282 return Err(err!(
283 *e,
284 "upload already aborted for key {:?}",
285 handle.location.full_key()
286 ));
287 }
288 };
289
290 match std::mem::replace(self, UploadState::Completed) {
291 UploadState::AppendInProgress {
292 request, initial_etag, ..
293 } => Self::complete_append(
294 fs.metablock.clone(),
295 handle.ino,
296 &handle.location,
297 request,
298 initial_etag,
299 fh,
300 )
301 .await
302 .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?,
303 UploadState::MPUInProgress { request, .. } => {
304 Self::complete_upload(fs.metablock.clone(), handle.ino, &handle.location, request, fh)
305 .await
306 .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?
307 }
308 UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
309 };
310 Ok(())
311 }
312
313 pub async fn complete_pending_upload(
320 &mut self,
321 metablock: Arc<dyn Metablock>,
322 ino: InodeNo,
323 key: &S3Location,
324 fh: u64,
325 ) -> Result<Option<Lookup>, InodeError> {
326 match self {
332 UploadState::Completed | UploadState::Failed(_) => return Ok(None),
334 _ => {}
335 }
336
337 match std::mem::replace(self, UploadState::Completed) {
338 UploadState::AppendInProgress {
339 request, initial_etag, ..
340 } => Ok(Some(
341 Self::complete_append(metablock, ino, key, request, initial_etag, fh).await?,
342 )),
343 UploadState::MPUInProgress { request, .. } => {
344 Ok(Some(Self::complete_upload(metablock, ino, key, request, fh).await?))
345 }
346 UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
347 }
348 }
349
350 async fn complete_upload(
351 metablock: Arc<dyn Metablock>,
352 ino: InodeNo,
353 key: &S3Location,
354 upload: UploadRequest<Client>,
355 fh: u64,
356 ) -> Result<Lookup, InodeError> {
357 let size = upload.size();
358 match upload.complete().await {
359 Ok(put_result) => {
360 debug!(etag=?put_result.etag.as_str(), %key, size, "put succeeded");
361 metablock.finish_writing(ino, Some(put_result.etag), fh).await
362 }
363 Err(e) => {
364 Self::finish_on_error(metablock, ino, key, fh).await;
365 Err(InodeError::upload_error(e, key.clone()))
366 }
367 }
368 }
369
370 async fn complete_append(
371 metablock: Arc<dyn Metablock>,
372 ino: InodeNo,
373 key: &S3Location,
374 upload: AppendUploadRequest<Client>,
375 initial_etag: Option<ETag>,
376 fh: u64,
377 ) -> Result<Lookup, InodeError> {
378 match Self::commit_append(upload, key).await {
379 Ok(etag) => {
380 let etag = etag.or(initial_etag);
381 metablock.finish_writing(ino, etag, fh).await
382 }
383 Err(err) => {
384 Self::finish_on_error(metablock, ino, key, fh).await;
385 Err(err)
386 }
387 }
388 }
389
390 async fn commit_append(upload: AppendUploadRequest<Client>, key: &S3Location) -> Result<Option<ETag>, InodeError> {
391 match upload.complete().await {
392 Ok(Some(result)) => {
393 debug!(%key, "put succeeded");
394 Ok(Some(result.etag))
395 }
396 Ok(None) => {
397 debug!(%key, "no put required");
398 Ok(None)
399 }
400 Err(e) => Err(InodeError::upload_error(e, key.clone())),
401 }
402 }
403
404 async fn finish_on_error(metablock: Arc<dyn Metablock>, ino: InodeNo, s3location: &S3Location, fh: u64) {
405 if let Err(err) = metablock.finish_writing(ino, None, fh).await {
406 error!(?err, key=?s3location.full_key(), "error updating the inode status");
408 }
409 }
410
411 async fn flush_writer(
415 fs: &S3Filesystem<Client>,
416 ino: InodeNo,
417 handle: Arc<FileHandle<Client>>,
418 fh: u64,
419 ) -> Result<(), Error> {
420 let pending_upload_hook = PendingUploadHook::new(fs.metablock.clone(), handle, fh);
421 fs.metablock.flush_writer(ino, fh, pending_upload_hook).await?;
422 Ok(())
423 }
424}
425
426fn get_tgid(pid: u32) -> Option<u32> {
432 if cfg!(not(target_os = "macos")) {
433 use std::fs::File;
434 use std::io::{BufRead, BufReader};
435
436 let path = format!("/proc/{pid}/task/{pid}/status");
437 let file = File::open(path).ok()?;
438 for line in BufReader::new(file).lines() {
439 let line = line.ok()?;
440 if line.starts_with("Tgid:") {
441 return line["Tgid: ".len()..].trim().parse::<u32>().ok();
442 }
443 }
444 }
445
446 None
447}
448
449fn are_from_same_process(pid1: u32, pid2: u32) -> bool {
451 if pid1 == pid2 {
452 return true;
453 }
454 let Some(tgid1) = get_tgid(pid1) else {
455 return false;
456 };
457 let Some(tgid2) = get_tgid(pid2) else {
458 return false;
459 };
460 tgid1 == tgid2
461}