shared_files/
lib.rs

1//! # Disk-Based Single-Writer, Multiple-Reader In-Process File Sharing
2//!
3//! Functionality for single-writer, multiple-reader file operations where multiple concurrent
4//! readers need to read from a file that is currently being written by the same process. The
5//! intended use case is the parallel processing of byte streams with minimum (process) memory
6//! requirements, e.g. in web services moving around large files.
7//!
8//! Normally, reading a file while it is written results in the read stream ending prematurely
9//! as EOF; the purpose of this crate is to prevent exactly that.
10//!
11//! Any file type can be used as a backing as long as it implements the crate's [`SharedFileType`]
12//! trait, which in turn requires [`AsyncWrite`](tokio::io::AsyncWrite) and [`AsyncRead`](tokio::io::AsyncRead).
13//!
14//! ## Crate Features
15//!
16//! - `async-tempfile`: Enables the [`SharedTemporaryFile`] type via the
17//!   [async-tempfile](https://github.com/sunsided/async-tempfile-rs) crate. Since this is how
18//!   this crate was initially meant to be used, this feature is enabled by default.
19
20#![cfg_attr(docsrs, feature(doc_cfg))]
21#![allow(unsafe_code)]
22
23mod reader;
24
25mod errors;
26#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
27#[cfg(feature = "async-tempfile")]
28mod temp_file;
29mod traits;
30mod writer;
31
32use crossbeam::atomic::AtomicCell;
33use std::collections::HashMap;
34use std::path::PathBuf;
35use std::sync::{Arc, Mutex};
36use std::task::Waker;
37use uuid::Uuid;
38
39pub use reader::{FileSize, SharedFileReader};
40pub use traits::*;
41pub use writer::SharedFileWriter;
42
43/// Prelude for commonly used types and traits.
44pub mod prelude {
45    pub use crate::errors::*;
46    pub use crate::traits::*;
47    pub use crate::SharedFile;
48
49    #[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
50    #[cfg(feature = "async-tempfile")]
51    pub use crate::SharedTemporaryFile;
52}
53
54#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
55#[cfg(feature = "async-tempfile")]
56pub use temp_file::*;
57
58/// A file with shared read/write access for in-process file sharing.
59///
60/// ## Writer / Reader Synchronization
61/// Since this wrapper takes over control of the write operation, readers
62/// will only be woken up on a call to [`SharedFileWriter::sync_data`],
63/// [`SharedFileWriter::sync_all`] or [`SharedFileWriter::flush`]. This is to
64/// ensure that data is actually written to the underlying buffer before
65/// the readers attempt to read it back.
66///
67/// ## Writer Finalization
68/// When a writer is dropped, it will move the state of the [`SharedFile`] to
69/// [`WriteState::Completed`]. It is important to note that drop is not asynchronous
70/// and therefore no flush to disk can be performed on the wrapped file.
71///
72/// <div class="warning">User code must make sure to manually sync to disk before dropping the writer.</div>
73#[derive(Debug)]
74pub struct SharedFile<T> {
75    /// The sentinel value to keep the file alive.
76    sentinel: Arc<Sentinel<T>>,
77}
78
79#[derive(Debug)]
80struct Sentinel<T> {
81    /// The original file. This keeps the file open until all references are dropped.
82    original: T,
83    /// The state of the write operation.
84    state: AtomicCell<WriteState>,
85    /// Wakers to wake up all interested readers.
86    wakers: Mutex<HashMap<Uuid, Waker>>,
87}
88
89/// The state of a file write operation.
90#[derive(Debug, Clone, Copy)]
91enum WriteState {
92    /// The write operation is pending. Contains the committed byte count and the written byte count.
93    Pending(usize, usize),
94    /// The write operation completed. Contains the number of bytes written (and committed).
95    Completed(usize),
96    /// The write operation failed.
97    Failed,
98}
99
100impl<T> SharedFile<T>
101where
102    T: SharedFileType<Type = T>,
103{
104    /// Synchronously creates a new temporary file.
105    pub fn new() -> Result<SharedFile<T>, T::Error>
106    where
107        T: NewFile<Target = T>,
108    {
109        let file = T::new()?;
110        Ok(Self::from(file))
111    }
112
113    /// Asynchronously creates a new temporary file.
114    pub async fn new_async() -> Result<SharedFile<T>, T::Error>
115    where
116        T: AsyncNewFile<Target = T>,
117    {
118        let file = T::new_async().await?;
119        Ok(Self::from(file))
120    }
121
122    /// Creates a writer for the file.
123    ///
124    /// ## Reader / writer Synchronization
125    ///
126    /// Since this wrapper takes over control of the write operation, readers
127    /// will only be woken up on a call to [`SharedFileWriter::sync_data`],
128    /// [`SharedFileWriter::sync_all`] or [`SharedFileWriter::flush`]. This is to
129    /// ensure that data is actually written to the underlying buffer before
130    /// the readers attempt to read it back.
131    ///
132    /// ## Writer finalization
133    ///
134    /// <div class="warning">User code must make sure to manually sync to disk before dropping the writer.</div>
135    ///
136    /// When a writer is dropped, it will move the state of the [`SharedFile`] to
137    /// [`WriteState::Completed`]. It is important to note that drop is not asynchronous
138    /// and therefore no flush to disk can be performed on the wrapped file.
139    ///
140    /// ## One writer at a time
141    ///
142    /// This operation can result in odd behavior if the
143    /// file is accessed multiple times for write access. User code
144    /// must make sure that only one meaningful write is performed at
145    /// the same time.
146    pub async fn writer(&self) -> Result<SharedFileWriter<T::Type>, T::OpenError> {
147        let file = self.sentinel.original.open_rw().await?;
148        Ok(SharedFileWriter::new(file, self.sentinel.clone()))
149    }
150
151    /// Creates a reader for the file.
152    pub async fn reader(&self) -> Result<SharedFileReader<T::Type>, T::OpenError> {
153        let file = self.sentinel.original.open_ro().await?;
154        Ok(SharedFileReader::new(file, self.sentinel.clone()))
155    }
156}
157
158impl<T> From<T> for SharedFile<T> {
159    fn from(value: T) -> Self {
160        Self {
161            sentinel: Arc::new(Sentinel {
162                original: value,
163                state: AtomicCell::new(WriteState::Pending(0, 0)),
164                wakers: Mutex::new(HashMap::default()),
165            }),
166        }
167    }
168}
169
170impl<T> Default for SharedFile<T>
171where
172    T: Default,
173{
174    fn default() -> Self {
175        Self {
176            sentinel: Arc::new(Sentinel {
177                original: T::default(),
178                state: AtomicCell::new(WriteState::Pending(0, 0)),
179                wakers: Mutex::new(HashMap::default()),
180            }),
181        }
182    }
183}
184
185impl<T> FilePath for SharedFile<T>
186where
187    T: FilePath,
188{
189    fn file_path(&self) -> &PathBuf {
190        self.sentinel.original.file_path()
191    }
192}
193
194impl<T> Sentinel<T> {
195    fn wake_readers(&self) {
196        let mut lock = self
197            .wakers
198            .lock()
199            .expect("failed to lock waker vector for writing");
200        lock.drain().for_each(|(_id, w)| w.wake());
201    }
202
203    fn register_reader_waker(&self, id: Uuid, waker: &Waker) {
204        let mut lock = self
205            .wakers
206            .lock()
207            .expect("failed to lock waker vector for reading");
208
209        lock.entry(id)
210            .and_modify(|e| e.clone_from(waker))
211            .or_insert(waker.clone());
212    }
213
214    fn remove_reader_waker(&self, id: &Uuid) {
215        let mut lock = self.wakers.lock().expect("failed to get lock for readers");
216        lock.remove(id);
217    }
218}