sync_file/
lib.rs

1//! Files that can be read concurrently.
2//!
3//! [`std::fs::File`] is `Sync` but reading concurrently from it results in race
4//! conditions, because the OS has a single cursor which is advanced and used
5//! by several threads.
6//!
7//! [`SyncFile`] solves this problem by using platform-specific extensions to
8//! do positional I/O, so the cursor of the file is not shared. Note that
9//! writing concurrently at the same position in a file can still result in race
10//! conditions, but only on the content, not the position.
11//!
12//! This library also exposes platform-independant fonctions for positional I/O.
13//!
14//! # Example
15//!
16//! ```
17//! use std::io::Read;
18//! use sync_file::SyncFile;
19//! # use std::io::Write;
20//! # let mut f = SyncFile::create("hello.txt")?;
21//! # f.write_all(b"Hello World!\n")?;
22//! # drop(f);
23//!
24//! /// Reads a file byte by byte.
25//! /// Don't do this in real code !
26//! fn read_all<R: Read>(mut file: R) -> std::io::Result<Vec<u8>> {
27//!     let mut result = Vec::new();
28//!     let mut buf = [0];
29//!
30//!     while file.read(&mut buf)? != 0 {
31//!         result.extend(&buf);
32//!     }
33//!
34//!     Ok(result)
35//! }
36//!
37//! // Open a file
38//! let f = SyncFile::open("hello.txt")?;
39//! let f_clone = f.clone();
40//!
41//! // Read it concurrently
42//! let thread = std::thread::spawn(move || read_all(f_clone));
43//! let res1 = read_all(f)?;
44//! let res2 = thread.join().unwrap()?;
45//!
46//! // Both clones read the whole content
47//! // This would not work with `std::fs::File`
48//! assert_eq!(res1, b"Hello World!\n");
49//! assert_eq!(res2, b"Hello World!\n");
50//!
51//! # std::fs::remove_file("hello.txt")?;
52//! # Ok::<_, std::io::Error>(())
53//! ```
54//!
55//! # OS support
56//!
57//! Windows, Unix and `wasip1` targets provide extensions for positional I/O,
58//! so on these targets `SyncFile` is zero-cost.
59//!
60//! If platform-specific extensions are not available, `SyncFile` fallbacks to a
61//! mutex.
62
63#![warn(missing_docs)]
64
65mod adapter;
66mod file;
67
68pub use adapter::Adapter;
69pub use file::{RandomAccessFile, SyncFile};
70
71use std::{cmp::min, convert::TryInto, io};
72
73/// The `ReadAt` trait allows for reading bytes from a source at a given offset.
74///
75/// Additionally, the methods of this trait only require a shared reference,
76/// which makes it ideal for parallel use.
77pub trait ReadAt {
78    /// Reads a number of bytes starting from a given offset.
79    ///
80    /// Returns the number of bytes read.
81    ///
82    /// Note that similar to [`io::Read::read`], it is not an error to return with
83    /// a short read.
84    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize>;
85
86    /// Reads the exact number of byte required to fill buf from the given
87    /// offset.
88    ///
89    /// # Errors
90    ///
91    /// If this function encounters an error of the kind
92    /// [`io::ErrorKind::Interrupted`] then the error is ignored and the
93    /// operation will continue.
94    ///
95    /// If this function encounters an “end of file” before completely filling
96    /// the buffer, it returns an error of the kind
97    /// [`io::ErrorKind::UnexpectedEof`]. The contents of buf are unspecified
98    /// in this case.
99    ///
100    /// If any other read error is encountered then this function immediately
101    /// returns. The contents of buf are unspecified in this case.
102    fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> {
103        while !buf.is_empty() {
104            match self.read_at(buf, offset) {
105                Ok(0) => break,
106                Ok(n) => {
107                    buf = &mut buf[n..];
108                    offset += n as u64;
109                }
110                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
111                Err(e) => return Err(e),
112            }
113        }
114        if buf.is_empty() {
115            Ok(())
116        } else {
117            Err(fill_buffer_error())
118        }
119    }
120
121    /// Like `read_at`, except that it reads into a slice of buffers.
122    ///
123    /// Data is copied to fill each buffer in order, with the final buffer
124    /// written to possibly being only partially filled. This method must behave
125    /// equivalently to a single call to read with concatenated buffers.
126    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
127        let buf = bufs
128            .iter_mut()
129            .find(|b| !b.is_empty())
130            .map_or(&mut [][..], |b| &mut **b);
131        self.read_at(buf, offset)
132    }
133}
134
135impl ReadAt for [u8] {
136    #[inline]
137    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
138        let read = (|| {
139            let offset = offset.try_into().ok()?;
140            let this = self.get(offset..)?;
141            let len = min(this.len(), buf.len());
142
143            buf[..len].copy_from_slice(&this[..len]);
144            Some(len)
145        })();
146
147        Ok(read.unwrap_or(0))
148    }
149
150    #[inline]
151    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
152        (|| {
153            let offset = offset.try_into().ok()?;
154            let this = self.get(offset..)?;
155            let len = buf.len();
156            (this.len() >= len).then(|| buf.copy_from_slice(&this[..len]))
157        })()
158        .ok_or_else(fill_buffer_error)
159    }
160}
161
162impl<const N: usize> ReadAt for [u8; N] {
163    #[inline]
164    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
165        self.as_ref().read_at(buf, offset)
166    }
167
168    #[inline]
169    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
170        self.as_ref().read_exact_at(buf, offset)
171    }
172
173    #[inline]
174    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
175        self.as_ref().read_vectored_at(bufs, offset)
176    }
177}
178
179impl ReadAt for Vec<u8> {
180    #[inline]
181    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
182        (**self).read_at(buf, offset)
183    }
184
185    #[inline]
186    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
187        (**self).read_exact_at(buf, offset)
188    }
189
190    #[inline]
191    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
192        (**self).read_vectored_at(bufs, offset)
193    }
194}
195
196impl ReadAt for std::borrow::Cow<'_, [u8]> {
197    #[inline]
198    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
199        (**self).read_at(buf, offset)
200    }
201
202    #[inline]
203    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
204        (**self).read_exact_at(buf, offset)
205    }
206
207    #[inline]
208    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
209        (**self).read_vectored_at(bufs, offset)
210    }
211}
212
213impl<R> ReadAt for &R
214where
215    R: ReadAt + ?Sized,
216{
217    #[inline]
218    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
219        (**self).read_at(buf, offset)
220    }
221
222    #[inline]
223    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
224        (**self).read_exact_at(buf, offset)
225    }
226
227    #[inline]
228    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
229        (**self).read_vectored_at(bufs, offset)
230    }
231}
232
233impl<R> ReadAt for Box<R>
234where
235    R: ReadAt + ?Sized,
236{
237    #[inline]
238    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
239        (**self).read_at(buf, offset)
240    }
241
242    #[inline]
243    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
244        (**self).read_exact_at(buf, offset)
245    }
246
247    #[inline]
248    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
249        (**self).read_vectored_at(bufs, offset)
250    }
251}
252
253impl<R> ReadAt for std::sync::Arc<R>
254where
255    R: ReadAt + ?Sized,
256{
257    #[inline]
258    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
259        (**self).read_at(buf, offset)
260    }
261
262    #[inline]
263    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
264        (**self).read_exact_at(buf, offset)
265    }
266
267    #[inline]
268    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
269        (**self).read_vectored_at(bufs, offset)
270    }
271}
272
273impl<R> ReadAt for std::rc::Rc<R>
274where
275    R: ReadAt + ?Sized,
276{
277    #[inline]
278    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
279        (**self).read_at(buf, offset)
280    }
281
282    #[inline]
283    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
284        (**self).read_exact_at(buf, offset)
285    }
286
287    #[inline]
288    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
289        (**self).read_vectored_at(bufs, offset)
290    }
291}
292
293impl<T> ReadAt for io::Cursor<T>
294where
295    T: AsRef<[u8]>,
296{
297    #[inline]
298    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
299        self.get_ref().as_ref().read_at(buf, offset)
300    }
301
302    #[inline]
303    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
304        self.get_ref().as_ref().read_exact_at(buf, offset)
305    }
306
307    #[inline]
308    fn read_vectored_at(&self, bufs: &mut [io::IoSliceMut<'_>], offset: u64) -> io::Result<usize> {
309        self.get_ref().as_ref().read_vectored_at(bufs, offset)
310    }
311}
312
313impl ReadAt for io::Empty {
314    #[inline]
315    fn read_at(&self, _buf: &mut [u8], _offset: u64) -> io::Result<usize> {
316        Ok(0)
317    }
318
319    #[inline]
320    fn read_exact_at(&self, buf: &mut [u8], _offset: u64) -> io::Result<()> {
321        if buf.is_empty() {
322            Ok(())
323        } else {
324            Err(fill_buffer_error())
325        }
326    }
327
328    #[inline]
329    fn read_vectored_at(&self, _: &mut [io::IoSliceMut<'_>], _: u64) -> io::Result<usize> {
330        Ok(0)
331    }
332}
333
334/// The `WriteAt` trait allows for writing bytes to a source at a given offset.
335///
336/// Additionally, the methods of this trait only require a shared reference,
337/// which makes it ideal for parallel use.
338pub trait WriteAt {
339    /// Writes a number of bytes starting from a given offset.
340    ///
341    /// Returns the number of bytes written.
342    ///
343    /// Note that similar to [`io::Write::write`], it is not an error to return a
344    /// short write.
345    fn write_at(&self, buf: &[u8], offset: u64) -> io::Result<usize>;
346
347    /// Attempts to write an entire buffer starting from a given offset.
348    ///
349    /// # Errors
350    ///
351    /// This function will return the first error of
352    /// non-[`io::ErrorKind::Interrupted`] kind that `write_at` returns.
353    fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> io::Result<()> {
354        while !buf.is_empty() {
355            match self.write_at(buf, offset) {
356                Ok(0) => {
357                    return Err(write_buffer_error());
358                }
359                Ok(n) => {
360                    buf = &buf[n..];
361                    offset += n as u64;
362                }
363                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
364                Err(e) => return Err(e),
365            }
366        }
367        Ok(())
368    }
369
370    /// Like `write_at`, except that it writes from a slice of buffers.
371    ///
372    /// Data is copied from each buffer in order, with the final buffer read
373    /// from possibly being only partially consumed. This method must behave as
374    /// a call to `write_at` with the buffers concatenated would.
375    fn write_vectored_at(&self, bufs: &[io::IoSlice<'_>], offset: u64) -> io::Result<usize> {
376        let buf = bufs
377            .iter()
378            .find(|b| !b.is_empty())
379            .map_or(&[][..], |b| &**b);
380        self.write_at(buf, offset)
381    }
382
383    /// Flush this output stream, ensuring that all intermediately buffered
384    /// contents reach their destination.
385    ///
386    /// # Errors
387    ///
388    /// It is considered an error if not all bytes could be written due to I/O
389    /// errors or EOF being reached.
390    #[inline]
391    fn flush(&self) -> io::Result<()> {
392        Ok(())
393    }
394}
395
396impl<W> WriteAt for &W
397where
398    W: WriteAt + ?Sized,
399{
400    #[inline]
401    fn write_at(&self, buf: &[u8], offset: u64) -> io::Result<usize> {
402        (**self).write_at(buf, offset)
403    }
404
405    #[inline]
406    fn write_all_at(&self, buf: &[u8], offset: u64) -> io::Result<()> {
407        (**self).write_all_at(buf, offset)
408    }
409
410    #[inline]
411    fn write_vectored_at(&self, bufs: &[io::IoSlice<'_>], offset: u64) -> io::Result<usize> {
412        (**self).write_vectored_at(bufs, offset)
413    }
414
415    #[inline]
416    fn flush(&self) -> io::Result<()> {
417        (**self).flush()
418    }
419}
420
421impl<W> WriteAt for Box<W>
422where
423    W: WriteAt + ?Sized,
424{
425    #[inline]
426    fn write_at(&self, buf: &[u8], offset: u64) -> io::Result<usize> {
427        (**self).write_at(buf, offset)
428    }
429
430    #[inline]
431    fn write_all_at(&self, buf: &[u8], offset: u64) -> io::Result<()> {
432        (**self).write_all_at(buf, offset)
433    }
434
435    #[inline]
436    fn write_vectored_at(&self, bufs: &[io::IoSlice<'_>], offset: u64) -> io::Result<usize> {
437        (**self).write_vectored_at(bufs, offset)
438    }
439
440    #[inline]
441    fn flush(&self) -> io::Result<()> {
442        (**self).flush()
443    }
444}
445
446impl<W> WriteAt for std::sync::Arc<W>
447where
448    W: WriteAt + ?Sized,
449{
450    #[inline]
451    fn write_at(&self, buf: &[u8], offset: u64) -> io::Result<usize> {
452        (**self).write_at(buf, offset)
453    }
454
455    #[inline]
456    fn write_all_at(&self, buf: &[u8], offset: u64) -> io::Result<()> {
457        (**self).write_all_at(buf, offset)
458    }
459
460    #[inline]
461    fn write_vectored_at(&self, bufs: &[io::IoSlice<'_>], offset: u64) -> io::Result<usize> {
462        (**self).write_vectored_at(bufs, offset)
463    }
464
465    #[inline]
466    fn flush(&self) -> io::Result<()> {
467        (**self).flush()
468    }
469}
470
471impl<W> WriteAt for std::rc::Rc<W>
472where
473    W: WriteAt + ?Sized,
474{
475    #[inline]
476    fn write_at(&self, buf: &[u8], offset: u64) -> io::Result<usize> {
477        (**self).write_at(buf, offset)
478    }
479
480    #[inline]
481    fn write_all_at(&self, buf: &[u8], offset: u64) -> io::Result<()> {
482        (**self).write_all_at(buf, offset)
483    }
484
485    #[inline]
486    fn write_vectored_at(&self, bufs: &[io::IoSlice<'_>], offset: u64) -> io::Result<usize> {
487        (**self).write_vectored_at(bufs, offset)
488    }
489
490    #[inline]
491    fn flush(&self) -> io::Result<()> {
492        (**self).flush()
493    }
494}
495
496impl WriteAt for io::Sink {
497    #[inline]
498    fn write_at(&self, buf: &[u8], _offset: u64) -> io::Result<usize> {
499        Ok(buf.len())
500    }
501
502    #[inline]
503    fn write_all_at(&self, _buf: &[u8], _offset: u64) -> io::Result<()> {
504        Ok(())
505    }
506
507    #[inline]
508    fn write_vectored_at(&self, bufs: &[io::IoSlice<'_>], _offset: u64) -> io::Result<usize> {
509        Ok(bufs.iter().map(|b| b.len()).sum())
510    }
511}
512
513/// The `Size` trait allows for getting the size of a stream.
514pub trait Size {
515    /// Returns the size of the stream.
516    fn size(&self) -> io::Result<u64>;
517}
518
519impl Size for [u8] {
520    #[inline]
521    fn size(&self) -> io::Result<u64> {
522        Ok(self.len() as u64)
523    }
524}
525
526impl<const N: usize> Size for [u8; N] {
527    #[inline]
528    fn size(&self) -> io::Result<u64> {
529        Ok(self.len() as u64)
530    }
531}
532
533impl Size for Vec<u8> {
534    #[inline]
535    fn size(&self) -> io::Result<u64> {
536        Ok(self.len() as u64)
537    }
538}
539
540impl Size for std::borrow::Cow<'_, [u8]> {
541    #[inline]
542    fn size(&self) -> io::Result<u64> {
543        Ok(self.len() as u64)
544    }
545}
546
547impl<T> Size for io::Cursor<T>
548where
549    T: AsRef<[u8]>,
550{
551    #[inline]
552    fn size(&self) -> io::Result<u64> {
553        Ok(self.get_ref().as_ref().len() as u64)
554    }
555}
556
557impl<T: Size + ?Sized> Size for &'_ T {
558    #[inline]
559    fn size(&self) -> io::Result<u64> {
560        (**self).size()
561    }
562}
563
564impl<T: Size + ?Sized> Size for Box<T> {
565    #[inline]
566    fn size(&self) -> io::Result<u64> {
567        (**self).size()
568    }
569}
570
571impl<T: Size + ?Sized> Size for std::sync::Arc<T> {
572    #[inline]
573    fn size(&self) -> io::Result<u64> {
574        (**self).size()
575    }
576}
577
578impl<T: Size + ?Sized> Size for std::rc::Rc<T> {
579    #[inline]
580    fn size(&self) -> io::Result<u64> {
581        (**self).size()
582    }
583}
584
585impl Size for io::Empty {
586    #[inline]
587    fn size(&self) -> io::Result<u64> {
588        Ok(0)
589    }
590}
591
592#[cold]
593fn fill_buffer_error() -> io::Error {
594    io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer")
595}
596
597#[cold]
598fn write_buffer_error() -> io::Error {
599    io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer")
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605    use std::io::prelude::*;
606
607    #[test]
608    fn smoke_test() {
609        let mut f = SyncFile::open("LICENSE-APACHE").unwrap();
610        let mut buf = [0; 9];
611        f.read_exact(&mut buf).unwrap();
612        assert_eq!(&buf, b"Copyright");
613        assert_eq!(f.stream_position().unwrap(), 9);
614        assert_eq!(f.seek(io::SeekFrom::Current(-2)).unwrap(), 7);
615        f.read_exact(&mut buf[..2]).unwrap();
616        assert_eq!(&buf[..2], b"ht");
617        assert!(f.seek(io::SeekFrom::Current(-10)).is_err());
618    }
619}