shared_files/lib.rs
1//! # Disk-Based Single-Writer, Multiple-Reader In-Process File Sharing
2//!
3//! Functionality for single-writer, multiple-reader file operations where multiple concurrent
4//! readers need to read from a file that is currently being written by the same process. The
5//! intended use case is the parallel processing of byte streams with minimum (process) memory
6//! requirements, e.g. in web services moving around large files.
7//!
8//! Normally, reading a file while it is written results in the read stream ending prematurely
9//! as EOF; the purpose of this crate is to prevent exactly that.
10//!
11//! Any file type can be used as a backing as long as it implements the crate's [`SharedFileType`]
12//! trait, which in turn requires [`AsyncWrite`](tokio::io::AsyncWrite) and [`AsyncRead`](tokio::io::AsyncRead).
13//!
14//! ## Crate Features
15//!
16//! - `async-tempfile`: Enables the [`SharedTemporaryFile`] type via the
17//! [async-tempfile](https://github.com/sunsided/async-tempfile-rs) crate. Since this is how
18//! this crate was initially meant to be used, this feature is enabled by default.
19
20#![cfg_attr(docsrs, feature(doc_cfg))]
21#![allow(unsafe_code)]
22
23mod reader;
24
25mod errors;
26#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
27#[cfg(feature = "async-tempfile")]
28mod temp_file;
29mod traits;
30mod writer;
31
32use crossbeam::atomic::AtomicCell;
33use std::collections::HashMap;
34use std::path::PathBuf;
35use std::sync::{Arc, Mutex};
36use std::task::Waker;
37use uuid::Uuid;
38
39pub use reader::{FileSize, SharedFileReader};
40pub use traits::*;
41pub use writer::SharedFileWriter;
42
43/// Prelude for commonly used types and traits.
44pub mod prelude {
45 pub use crate::errors::*;
46 pub use crate::traits::*;
47 pub use crate::SharedFile;
48
49 #[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
50 #[cfg(feature = "async-tempfile")]
51 pub use crate::SharedTemporaryFile;
52}
53
54#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
55#[cfg(feature = "async-tempfile")]
56pub use temp_file::*;
57
58/// A file with shared read/write access for in-process file sharing.
59///
60/// ## Writer / Reader Synchronization
61/// Since this wrapper takes over control of the write operation, readers
62/// will only be woken up on a call to [`SharedFileWriter::sync_data`],
63/// [`SharedFileWriter::sync_all`] or [`SharedFileWriter::flush`]. This is to
64/// ensure that data is actually written to the underlying buffer before
65/// the readers attempt to read it back.
66///
67/// ## Writer Finalization
68/// When a writer is dropped, it will move the state of the [`SharedFile`] to
69/// [`WriteState::Completed`]. It is important to note that drop is not asynchronous
70/// and therefore no flush to disk can be performed on the wrapped file.
71///
72/// <div class="warning">User code must make sure to manually sync to disk before dropping the writer.</div>
73#[derive(Debug)]
74pub struct SharedFile<T> {
75 /// The sentinel value to keep the file alive.
76 sentinel: Arc<Sentinel<T>>,
77}
78
79#[derive(Debug)]
80struct Sentinel<T> {
81 /// The original file. This keeps the file open until all references are dropped.
82 original: T,
83 /// The state of the write operation.
84 state: AtomicCell<WriteState>,
85 /// Wakers to wake up all interested readers.
86 wakers: Mutex<HashMap<Uuid, Waker>>,
87}
88
89/// The state of a file write operation.
90#[derive(Debug, Clone, Copy)]
91enum WriteState {
92 /// The write operation is pending. Contains the committed byte count and the written byte count.
93 Pending(usize, usize),
94 /// The write operation completed. Contains the number of bytes written (and committed).
95 Completed(usize),
96 /// The write operation failed.
97 Failed,
98}
99
100impl<T> SharedFile<T>
101where
102 T: SharedFileType<Type = T>,
103{
104 /// Synchronously creates a new temporary file.
105 pub fn new() -> Result<SharedFile<T>, T::Error>
106 where
107 T: NewFile<Target = T>,
108 {
109 let file = T::new()?;
110 Ok(Self::from(file))
111 }
112
113 /// Asynchronously creates a new temporary file.
114 pub async fn new_async() -> Result<SharedFile<T>, T::Error>
115 where
116 T: AsyncNewFile<Target = T>,
117 {
118 let file = T::new_async().await?;
119 Ok(Self::from(file))
120 }
121
122 /// Creates a writer for the file.
123 ///
124 /// ## Reader / writer Synchronization
125 ///
126 /// Since this wrapper takes over control of the write operation, readers
127 /// will only be woken up on a call to [`SharedFileWriter::sync_data`],
128 /// [`SharedFileWriter::sync_all`] or [`SharedFileWriter::flush`]. This is to
129 /// ensure that data is actually written to the underlying buffer before
130 /// the readers attempt to read it back.
131 ///
132 /// ## Writer finalization
133 ///
134 /// <div class="warning">User code must make sure to manually sync to disk before dropping the writer.</div>
135 ///
136 /// When a writer is dropped, it will move the state of the [`SharedFile`] to
137 /// [`WriteState::Completed`]. It is important to note that drop is not asynchronous
138 /// and therefore no flush to disk can be performed on the wrapped file.
139 ///
140 /// ## One writer at a time
141 ///
142 /// This operation can result in odd behavior if the
143 /// file is accessed multiple times for write access. User code
144 /// must make sure that only one meaningful write is performed at
145 /// the same time.
146 pub async fn writer(&self) -> Result<SharedFileWriter<T::Type>, T::OpenError> {
147 let file = self.sentinel.original.open_rw().await?;
148 Ok(SharedFileWriter::new(file, self.sentinel.clone()))
149 }
150
151 /// Creates a reader for the file.
152 pub async fn reader(&self) -> Result<SharedFileReader<T::Type>, T::OpenError> {
153 let file = self.sentinel.original.open_ro().await?;
154 Ok(SharedFileReader::new(file, self.sentinel.clone()))
155 }
156}
157
158impl<T> From<T> for SharedFile<T> {
159 fn from(value: T) -> Self {
160 Self {
161 sentinel: Arc::new(Sentinel {
162 original: value,
163 state: AtomicCell::new(WriteState::Pending(0, 0)),
164 wakers: Mutex::new(HashMap::default()),
165 }),
166 }
167 }
168}
169
170impl<T> Default for SharedFile<T>
171where
172 T: Default,
173{
174 fn default() -> Self {
175 Self {
176 sentinel: Arc::new(Sentinel {
177 original: T::default(),
178 state: AtomicCell::new(WriteState::Pending(0, 0)),
179 wakers: Mutex::new(HashMap::default()),
180 }),
181 }
182 }
183}
184
185impl<T> FilePath for SharedFile<T>
186where
187 T: FilePath,
188{
189 fn file_path(&self) -> &PathBuf {
190 self.sentinel.original.file_path()
191 }
192}
193
194impl<T> Sentinel<T> {
195 fn wake_readers(&self) {
196 let mut lock = self
197 .wakers
198 .lock()
199 .expect("failed to lock waker vector for writing");
200 lock.drain().for_each(|(_id, w)| w.wake());
201 }
202
203 fn register_reader_waker(&self, id: Uuid, waker: &Waker) {
204 let mut lock = self
205 .wakers
206 .lock()
207 .expect("failed to lock waker vector for reading");
208
209 lock.entry(id)
210 .and_modify(|e| e.clone_from(waker))
211 .or_insert(waker.clone());
212 }
213
214 fn remove_reader_waker(&self, id: &Uuid) {
215 let mut lock = self.wakers.lock().expect("failed to get lock for readers");
216 lock.remove(id);
217 }
218}