Skip to main content

mountpoint_s3_fs/fs/
handles.rs

1use std::str::FromStr as _;
2
3use mountpoint_s3_client::ObjectClient;
4use mountpoint_s3_client::types::ETag;
5use tracing::{debug, error};
6
7use crate::fs::InodeError;
8use crate::metablock::{Lookup, Metablock, NewHandle, PendingUploadHook, ReadWriteMode, S3Location};
9use crate::object::ObjectId;
10use crate::prefetch::{HandleId, PrefetchGetObject};
11use crate::sync::{Arc, AsyncMutex};
12use crate::upload::{AppendUploadRequest, UploadRequest};
13
14use super::{Error, InodeNo, OpenFlags, S3Filesystem, ToErrno};
15
16#[derive(Debug)]
17pub struct FileHandle<Client>
18where
19    Client: ObjectClient + Clone + Send + Sync + 'static,
20{
21    pub ino: InodeNo,
22    pub location: S3Location,
23    pub state: AsyncMutex<FileHandleState<Client>>,
24    /// Process that created the handle
25    pub open_pid: u32,
26}
27
28impl<Client> FileHandle<Client>
29where
30    Client: ObjectClient + Clone + Send + Sync + 'static,
31{
32    pub fn file_name(&self) -> &str {
33        self.location.name()
34    }
35}
36
37#[derive(Debug)]
38pub enum FileHandleState<Client>
39where
40    Client: ObjectClient + Clone + Send + Sync + 'static,
41{
42    /// The file handle has been assigned as a read handle
43    Read {
44        request: PrefetchGetObject<Client>,
45        /// Set to true when `flush` called on the handle, and unset on a `read`
46        flushed: bool,
47    },
48    /// The file handle has been assigned as a write handle
49    Write {
50        state: UploadState<Client>,
51        /// Set to true when `flush` called on the handle, and unset on a `write`
52        flushed: bool,
53    },
54}
55
56impl<Client> FileHandleState<Client>
57where
58    Client: ObjectClient + Clone + Send + Sync,
59{
60    pub async fn new(
61        fh: u64,
62        handle: &NewHandle,
63        flags: OpenFlags,
64        fs: &S3Filesystem<Client>,
65    ) -> Result<FileHandleState<Client>, Error> {
66        let ino = handle.lookup.ino();
67        let stat = handle.lookup.stat();
68        let location = handle.lookup.s3_location()?;
69        let full_key = location.full_key();
70        let bucket = location.bucket_name();
71
72        match handle.mode {
73            ReadWriteMode::Read => {
74                let object_size = stat.size as u64;
75                let etag = match &stat.etag {
76                    None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", ino)),
77                    Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
78                };
79                let object_id = ObjectId::new(full_key.into(), etag);
80                let request = fs
81                    .prefetcher
82                    .prefetch(bucket.to_string(), object_id, HandleId::new(fh), object_size);
83                let handle = FileHandleState::Read {
84                    request,
85                    flushed: false,
86                };
87                metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
88                Ok(handle)
89            }
90            ReadWriteMode::Write => {
91                let is_truncate = flags.contains(OpenFlags::O_TRUNC);
92                let write_mode = fs.config.write_mode();
93
94                let upload_state = if write_mode.incremental_upload {
95                    let initial_etag = if is_truncate {
96                        None
97                    } else {
98                        stat.etag.as_ref().map(|e| e.into())
99                    };
100                    let current_offset = if is_truncate { 0 } else { stat.size as u64 };
101                    let request = fs.uploader.start_incremental_upload(
102                        bucket.to_string(),
103                        full_key.into(),
104                        current_offset,
105                        initial_etag.clone(),
106                    );
107                    UploadState::AppendInProgress {
108                        request,
109                        initial_etag,
110                        written_bytes: 0,
111                    }
112                } else {
113                    let request = fs
114                        .uploader
115                        .start_atomic_upload(bucket.to_string(), full_key.into())
116                        .map_err(|e| err!(libc::EIO, source:e, "put failed to start"))?;
117                    UploadState::MPUInProgress { request }
118                };
119                let handle = FileHandleState::Write {
120                    state: upload_state,
121                    flushed: false,
122                };
123                metrics::gauge!("fs.current_handles", "type" => "write").increment(1.0);
124                Ok(handle)
125            }
126        }
127    }
128}
129
130#[derive(Debug)]
131pub enum UploadState<Client: ObjectClient + Send + Sync> {
132    AppendInProgress {
133        request: AppendUploadRequest<Client>,
134        initial_etag: Option<ETag>,
135        written_bytes: usize,
136    },
137    MPUInProgress {
138        request: UploadRequest<Client>,
139    },
140    Completed,
141    // Remember the failure reason to respond to retries
142    Failed(libc::c_int),
143}
144
145impl<Client> UploadState<Client>
146where
147    Client: ObjectClient + Send + Sync + Clone + 'static,
148{
149    pub async fn write(
150        &mut self,
151        fs: &S3Filesystem<Client>,
152        handle: &FileHandle<Client>,
153        offset: i64,
154        data: &[u8],
155        fh: u64,
156    ) -> Result<u32, Error> {
157        let result: Result<_, Error> = match self {
158            UploadState::AppendInProgress {
159                request, written_bytes, ..
160            } => match request.write(offset as u64, data).await {
161                Ok(len) => {
162                    *written_bytes += len;
163                    Ok(len)
164                }
165                Err(e) => Err(e.into()),
166            },
167            UploadState::MPUInProgress { request, .. } => match request.write(offset, data).await {
168                Ok(len) => Ok(len),
169                Err(e) => Err(e.into()),
170            },
171            UploadState::Completed => {
172                return Err(err!(libc::EIO, "upload already completed for key {}", handle.location));
173            }
174            UploadState::Failed(e) => {
175                return Err(err!(*e, "upload already aborted for key {}", handle.location));
176            }
177        };
178
179        match result {
180            Ok(len) => {
181                fs.metablock.inc_file_size(handle.ino, len).await?;
182                Ok(len as u32)
183            }
184            Err(e) => {
185                // Abort the request.
186                match std::mem::replace(self, UploadState::Failed(e.to_errno())) {
187                    UploadState::MPUInProgress { .. } | UploadState::AppendInProgress { .. } => {
188                        Self::finish_on_error(fs.metablock.clone(), handle.ino, &handle.location, fh).await;
189                    }
190                    UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
191                }
192                Err(e)
193            }
194        }
195    }
196
197    /// Commit data to S3 and mark the upload as completed. In case it is an append request, start
198    /// a new request with the current offset and new etag.
199    pub async fn commit(
200        &mut self,
201        fs: &S3Filesystem<Client>,
202        handle: Arc<FileHandle<Client>>,
203        fh: u64,
204    ) -> Result<(), Error> {
205        match &self {
206            UploadState::Completed => return Ok(()),
207            UploadState::Failed(e) => {
208                return Err(err!(*e, "upload already aborted for key {}", handle.location));
209            }
210            _ => {}
211        };
212
213        match std::mem::replace(self, UploadState::Completed) {
214            UploadState::AppendInProgress {
215                request,
216                initial_etag,
217                written_bytes,
218            } => {
219                let current_offset = request.current_offset();
220                let etag = Self::commit_append(request, &handle.location)
221                    .await
222                    .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?;
223
224                // Restart append request.
225                let initial_etag = etag.or(initial_etag);
226                let request = fs.uploader.start_incremental_upload(
227                    handle.location.bucket_name().to_owned(),
228                    handle.location.full_key().to_string(),
229                    current_offset,
230                    initial_etag.clone(),
231                );
232                *self = UploadState::AppendInProgress {
233                    request,
234                    initial_etag: initial_etag.clone(),
235                    written_bytes,
236                };
237            }
238            UploadState::MPUInProgress { request, .. } => {
239                Self::complete_upload(fs.metablock.clone(), handle.ino, &handle.location, request, fh)
240                    .await
241                    .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?;
242            }
243            UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
244        }
245        Ok(())
246    }
247
248    /// Commit any buffered data (if written by the opener-process) to S3, and mark the upload as
249    /// completed. In case there is no data written, or if it is written by a different process,
250    /// don't complete the upload but mark the handle as flushed.
251    pub async fn complete(
252        &mut self,
253        fs: &S3Filesystem<Client>,
254        handle: Arc<FileHandle<Client>>,
255        pid: u32,
256        open_pid: u32,
257        fh: u64,
258    ) -> Result<(), Error> {
259        match self {
260            UploadState::AppendInProgress { written_bytes, .. } => {
261                if *written_bytes == 0 || !are_from_same_process(open_pid, pid) {
262                    // Commit current changes. But don't close the write handle, only mark it as flushed
263                    self.commit(fs, handle.clone(), fh).await?;
264                    return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
265                }
266            }
267            UploadState::MPUInProgress { request, .. } => {
268                if request.size() == 0 {
269                    debug!(key=%handle.location, "not completing upload because nothing was written yet");
270                    return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
271                }
272                if !are_from_same_process(open_pid, pid) {
273                    debug!(
274                        key=%handle.location,
275                        pid, open_pid, "not completing upload because current PID differs from PID at open",
276                    );
277                    return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
278                }
279            }
280            UploadState::Completed => return Ok(()),
281            UploadState::Failed(e) => {
282                return Err(err!(
283                    *e,
284                    "upload already aborted for key {:?}",
285                    handle.location.full_key()
286                ));
287            }
288        };
289
290        match std::mem::replace(self, UploadState::Completed) {
291            UploadState::AppendInProgress {
292                request, initial_etag, ..
293            } => Self::complete_append(
294                fs.metablock.clone(),
295                handle.ino,
296                &handle.location,
297                request,
298                initial_etag,
299                fh,
300            )
301            .await
302            .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?,
303            UploadState::MPUInProgress { request, .. } => {
304                Self::complete_upload(fs.metablock.clone(), handle.ino, &handle.location, request, fh)
305                    .await
306                    .inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?
307            }
308            UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
309        };
310        Ok(())
311    }
312
313    /// Check state of upload, and complete the upload if it's still in-progress (i.e. not completed).
314    ///
315    /// When successful, returns a [`Lookup`] where the upload was still in-progress and thus
316    /// completed by this method call.
317    ///
318    /// This is only called by the PendingUploadHook
319    pub async fn complete_pending_upload(
320        &mut self,
321        metablock: Arc<dyn Metablock>,
322        ino: InodeNo,
323        key: &S3Location,
324        fh: u64,
325    ) -> Result<Option<Lookup>, InodeError> {
326        // We do two rounds of `match` here because we want to retain the UploadState in case it is
327        // already terminal.
328        // State can only be "Failed" if there has been a previous upload attempt, in which case the
329        // data has been lost already, but the PendingUploadHook will have invalidated the cache
330        // for future requests to force revalidation of inode metadata.
331        match self {
332            // TODO: good to have - relay the error from the previous attempt here
333            UploadState::Completed | UploadState::Failed(_) => return Ok(None),
334            _ => {}
335        }
336
337        match std::mem::replace(self, UploadState::Completed) {
338            UploadState::AppendInProgress {
339                request, initial_etag, ..
340            } => Ok(Some(
341                Self::complete_append(metablock, ino, key, request, initial_etag, fh).await?,
342            )),
343            UploadState::MPUInProgress { request, .. } => {
344                Ok(Some(Self::complete_upload(metablock, ino, key, request, fh).await?))
345            }
346            UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
347        }
348    }
349
350    async fn complete_upload(
351        metablock: Arc<dyn Metablock>,
352        ino: InodeNo,
353        key: &S3Location,
354        upload: UploadRequest<Client>,
355        fh: u64,
356    ) -> Result<Lookup, InodeError> {
357        let size = upload.size();
358        match upload.complete().await {
359            Ok(put_result) => {
360                debug!(etag=?put_result.etag.as_str(), %key, size, "put succeeded");
361                metablock.finish_writing(ino, Some(put_result.etag), fh).await
362            }
363            Err(e) => {
364                Self::finish_on_error(metablock, ino, key, fh).await;
365                Err(InodeError::upload_error(e, key.clone()))
366            }
367        }
368    }
369
370    async fn complete_append(
371        metablock: Arc<dyn Metablock>,
372        ino: InodeNo,
373        key: &S3Location,
374        upload: AppendUploadRequest<Client>,
375        initial_etag: Option<ETag>,
376        fh: u64,
377    ) -> Result<Lookup, InodeError> {
378        match Self::commit_append(upload, key).await {
379            Ok(etag) => {
380                let etag = etag.or(initial_etag);
381                metablock.finish_writing(ino, etag, fh).await
382            }
383            Err(err) => {
384                Self::finish_on_error(metablock, ino, key, fh).await;
385                Err(err)
386            }
387        }
388    }
389
390    async fn commit_append(upload: AppendUploadRequest<Client>, key: &S3Location) -> Result<Option<ETag>, InodeError> {
391        match upload.complete().await {
392            Ok(Some(result)) => {
393                debug!(%key, "put succeeded");
394                Ok(Some(result.etag))
395            }
396            Ok(None) => {
397                debug!(%key, "no put required");
398                Ok(None)
399            }
400            Err(e) => Err(InodeError::upload_error(e, key.clone())),
401        }
402    }
403
404    async fn finish_on_error(metablock: Arc<dyn Metablock>, ino: InodeNo, s3location: &S3Location, fh: u64) {
405        if let Err(err) = metablock.finish_writing(ino, None, fh).await {
406            // Log the issue but still return put_result.
407            error!(?err, key=?s3location.full_key(), "error updating the inode status");
408        }
409    }
410
411    /// Mark the write-handle as deactivated in the inode's handle_map entry, and attach a
412    /// PendingUploadHook to the inode for a future release/open to complete the delayed upload
413    /// and clean up the writer.
414    async fn flush_writer(
415        fs: &S3Filesystem<Client>,
416        ino: InodeNo,
417        handle: Arc<FileHandle<Client>>,
418        fh: u64,
419    ) -> Result<(), Error> {
420        let pending_upload_hook = PendingUploadHook::new(fs.metablock.clone(), handle, fh);
421        fs.metablock.flush_writer(ino, fh, pending_upload_hook).await?;
422        Ok(())
423    }
424}
425
426/// Get the thread-group id (tgid) from a process id (pid).
427/// Despite the names, the process id is actually the thread id
428/// and the thread-group id is the parent process id.
429/// Returns `None` if unable to find or parse the task status.
430/// Not supported on macOS.
431fn get_tgid(pid: u32) -> Option<u32> {
432    if cfg!(not(target_os = "macos")) {
433        use std::fs::File;
434        use std::io::{BufRead, BufReader};
435
436        let path = format!("/proc/{pid}/task/{pid}/status");
437        let file = File::open(path).ok()?;
438        for line in BufReader::new(file).lines() {
439            let line = line.ok()?;
440            if line.starts_with("Tgid:") {
441                return line["Tgid: ".len()..].trim().parse::<u32>().ok();
442            }
443        }
444    }
445
446    None
447}
448
449/// Check whether two pids correspond to the same process.
450fn are_from_same_process(pid1: u32, pid2: u32) -> bool {
451    if pid1 == pid2 {
452        return true;
453    }
454    let Some(tgid1) = get_tgid(pid1) else {
455        return false;
456    };
457    let Some(tgid2) = get_tgid(pid2) else {
458        return false;
459    };
460    tgid1 == tgid2
461}