1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
//! # Disk-Based Single-Writer, Multiple-Reader In-Process File Sharing
//!
//! Functionality for single-writer, multiple-reader file operations where multiple concurrent
//! readers need to read from a file that is currently being written by the same process. The
//! intended use case is the parallel processing of byte streams with minimum (process) memory
//! requirements, e.g. in web services moving around large files.
//!
//! Normally, reading a file while it is written results in the read stream ending prematurely
//! as EOF; the purpose of this crate is to prevent exactly that.
//!
//! Any file type can be used as a backing as long as it implements the crate's [`SharedFileType`]
//! trait, which in turn requires [`AsyncWrite`](tokio::io::AsyncWrite) and [`AsyncRead`](tokio::io::AsyncRead).
//!
//! ## Crate Features
//!
//! - `async-tempfile`: Enables the [`SharedTemporaryFile`] type via the
//! [async-tempfile](https://github.com/sunsided/async-tempfile-rs) crate. Since this is how
//! this crate was initially meant to be used, this feature is enabled by default.
#![cfg_attr(docsrs, feature(doc_cfg))]
#![allow(unsafe_code)]
mod reader;
#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
#[cfg(feature = "async-tempfile")]
mod temp_file;
mod traits;
mod writer;
use crossbeam::atomic::AtomicCell;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::task::Waker;
use uuid::Uuid;
pub use reader::{FileSize, ReadError, SharedFileReader};
pub use traits::*;
pub use writer::{CompleteWritingError, SharedFileWriter, WriteError};
/// Prelude for commonly used types and traits.
pub mod prelude {
pub use crate::traits::*;
pub use crate::SharedFile;
#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
#[cfg(feature = "async-tempfile")]
pub use crate::SharedTemporaryFile;
}
#[cfg_attr(docsrs, doc(cfg(feature = "async-tempfile")))]
#[cfg(feature = "async-tempfile")]
pub use temp_file::*;
/// A file with shared read/write access for in-process file sharing.
///
/// ## Writer / Reader Synchronization
/// Since this wrapper takes over control of the write operation, readers
/// will only be woken up on a call to [`SharedFileWriter::sync_data`],
/// [`SharedFileWriter::sync_all`] or [`SharedFileWriter::flush`]. This is to
/// ensure that data is actually written to the underlying buffer before
/// the readers attempt to read it back.
///
/// ## Writer Finalization
/// When a writer is dropped, it will move the state of the [`SharedFile`] to
/// [`WriteState::Completed`]. It is important to note that drop is not asynchronous
/// and therefore no flush to disk can be performed on the wrapped file.
///
/// ⚠️ **User code must make sure to manually sync to disk before dropping the writer.**
#[derive(Debug)]
pub struct SharedFile<T> {
/// The sentinel value to keep the file alive.
sentinel: Arc<Sentinel<T>>,
}
#[derive(Debug)]
struct Sentinel<T> {
/// The original file. This keeps the file open until all references are dropped.
original: T,
/// The state of the write operation.
state: AtomicCell<WriteState>,
/// Wakers to wake up all interested readers.
wakers: Mutex<HashMap<Uuid, Waker>>,
}
/// The state of a file write operation.
#[derive(Debug, Clone, Copy)]
enum WriteState {
/// The write operation is pending. Contains the committed byte count and the written byte count.
Pending(usize, usize),
/// The write operation completed. Contains the number of bytes written (and committed).
Completed(usize),
/// The write operation failed.
Failed,
}
impl<T> SharedFile<T>
where
T: SharedFileType<Type = T>,
{
/// Synchronously creates a new temporary file.
pub fn new() -> Result<SharedFile<T>, T::Error>
where
T: NewFile<Target = T>,
{
let file = T::new()?;
Ok(Self::from(file))
}
/// Asynchronously creates a new temporary file.
pub async fn new_async() -> Result<SharedFile<T>, T::Error>
where
T: AsyncNewFile<Target = T>,
{
let file = T::new_async().await?;
Ok(Self::from(file))
}
/// Creates a writer for the file.
///
/// ## Reader / writer Synchronization
///
/// Since this wrapper takes over control of the write operation, readers
/// will only be woken up on a call to [`SharedFileWriter::sync_data`],
/// [`SharedFileWriter::sync_all`] or [`SharedFileWriter::flush`]. This is to
/// ensure that data is actually written to the underlying buffer before
/// the readers attempt to read it back.
///
/// ## Writer finalization
///
/// ⚠️ **User code must make sure to manually sync to disk before dropping the writer.**
///
/// When a writer is dropped, it will move the state of the [`SharedFile`] to
/// [`WriteState::Completed`]. It is important to note that drop is not asynchronous
/// and therefore no flush to disk can be performed on the wrapped file.
///
/// ## One writer at a time
///
/// This operation can result in odd behavior if the
/// file is accessed multiple times for write access. User code
/// must make sure that only one meaningful write is performed at
/// the same time.
pub async fn writer(&self) -> Result<SharedFileWriter<T::Type>, T::OpenError> {
let file = self.sentinel.original.open_rw().await?;
Ok(SharedFileWriter::new(file, self.sentinel.clone()))
}
/// Creates a reader for the file.
pub async fn reader(&self) -> Result<SharedFileReader<T::Type>, T::OpenError> {
let file = self.sentinel.original.open_ro().await?;
Ok(SharedFileReader::new(file, self.sentinel.clone()))
}
}
impl<T> From<T> for SharedFile<T> {
fn from(value: T) -> Self {
Self {
sentinel: Arc::new(Sentinel {
original: value,
state: AtomicCell::new(WriteState::Pending(0, 0)),
wakers: Mutex::new(HashMap::default()),
}),
}
}
}
impl<T> Default for SharedFile<T>
where
T: Default,
{
fn default() -> Self {
Self {
sentinel: Arc::new(Sentinel {
original: T::default(),
state: AtomicCell::new(WriteState::Pending(0, 0)),
wakers: Mutex::new(HashMap::default()),
}),
}
}
}
impl<T> FilePath for SharedFile<T>
where
T: FilePath,
{
fn file_path(&self) -> &PathBuf {
self.sentinel.original.file_path()
}
}
impl<T> Sentinel<T> {
fn wake_readers(&self) {
let mut lock = self
.wakers
.lock()
.expect("failed to lock waker vector for writing");
lock.drain().for_each(|(_id, w)| w.wake());
}
fn register_reader_waker(&self, id: Uuid, waker: &Waker) {
let mut lock = self
.wakers
.lock()
.expect("failed to lock waker vector for reading");
lock.entry(id)
.and_modify(|e| *e = waker.clone())
.or_insert(waker.clone());
}
fn remove_reader_waker(&self, id: &Uuid) {
let mut lock = self.wakers.lock().expect("failed to get lock for readers");
lock.remove(id);
}
}