Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::GTemp,
27    veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39/// ```Arc<Vec<u8>>```
40pub type Data = Arc<Vec<u8>>;
41
42/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
43/// Performs commit asyncronously.
44///
45/// #Example
46///
47/// ```
48/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
49/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
50/// af.write( 0, &[1,2,3,4] );
51/// af.commit(4);
52/// af.wait_complete();
53/// ```
54///
55/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
56/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
57/// the updates to underlying storage have been applied.
58pub struct AtomicFile {
59    /// New updates are written here.
60    map: WMap,
61    /// Underlying file, with previous updates mapped.
62    cf: Arc<RwLock<CommitFile>>,
63    /// File size.
64    size: u64,
65    /// For sending update maps to be saved.
66    tx: std::sync::mpsc::Sender<(u64, WMap)>,
67    /// Held by update process while it is active.
68    busy: Arc<Mutex<()>>,
69    /// Limit on size of CommitFile map.
70    map_lim: usize,
71}
72
73impl AtomicFile {
74    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
75    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76        Self::new_with_limits(stg, upd, &Limits::default())
77    }
78
79    /// Construct Atomic file with specified limits.
80    pub fn new_with_limits(
81        stg: Box<dyn Storage>,
82        upd: Box<dyn BasicStorage>,
83        lim: &Limits,
84    ) -> Box<Self> {
85        let size = stg.size();
86        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
91
92        // Start the thread which does save asyncronously.
93        let (cf1, busy1) = (cf.clone(), busy.clone());
94
95        std::thread::spawn(move || {
96            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
97            while let Ok((size, map)) = rx.recv() {
98                let _lock = busy1.lock();
99                baf.map = map;
100                baf.commit(size);
101                cf1.write().unwrap().done_one();
102            }
103        });
104        Box::new(Self {
105            map: WMap::default(),
106            cf,
107            size,
108            tx,
109            busy,
110            map_lim: lim.map_lim,
111        })
112    }
113}
114
115impl Storage for AtomicFile {
116    fn clone(&self) -> Box<dyn Storage> {
117        panic!()
118    }
119}
120
121impl BasicStorage for AtomicFile {
122    fn commit(&mut self, size: u64) {
123        self.size = size;
124        if self.map.is_empty() {
125            return;
126        }
127        if self.cf.read().unwrap().map.len() > self.map_lim {
128            self.wait_complete();
129        }
130        let map = std::mem::take(&mut self.map);
131        let cf = &mut *self.cf.write().unwrap();
132        cf.todo += 1;
133        // Apply map of updates to CommitFile.
134        map.to_storage(cf);
135        // Send map of updates to thread to be written to underlying storage.
136        self.tx.send((size, map)).unwrap();
137    }
138
139    fn size(&self) -> u64 {
140        self.size
141    }
142
143    fn read(&self, start: u64, data: &mut [u8]) {
144        self.map.read(start, data, &*self.cf.read().unwrap());
145    }
146
147    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
148        self.map.write(start, data, off, len);
149    }
150
151    fn write(&mut self, start: u64, data: &[u8]) {
152        let len = data.len();
153        let d = Arc::new(data.to_vec());
154        self.write_data(start, d, 0, len);
155    }
156
157    fn wait_complete(&self) {
158        while self.cf.read().unwrap().todo != 0 {
159            let _x = self.busy.lock();
160        }
161    }
162}
163
164struct CommitFile {
165    /// Buffered underlying storage.
166    stg: ReadBufStg<256>,
167    /// Map of committed updates.
168    map: WMap,
169    /// Number of outstanding unsaved commits.
170    todo: usize,
171}
172
173impl CommitFile {
174    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
175        Self {
176            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
177            map: WMap::default(),
178            todo: 0,
179        }
180    }
181
182    fn done_one(&mut self) {
183        self.todo -= 1;
184        if self.todo == 0 {
185            self.map = WMap::default();
186            self.stg.reset();
187        }
188    }
189}
190
191impl BasicStorage for CommitFile {
192    fn commit(&mut self, _size: u64) {
193        panic!()
194    }
195
196    fn size(&self) -> u64 {
197        panic!()
198    }
199
200    fn read(&self, start: u64, data: &mut [u8]) {
201        self.map.read(start, data, &self.stg);
202    }
203
204    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
205        self.map.write(start, data, off, len);
206    }
207
208    fn write(&mut self, _start: u64, _data: &[u8]) {
209        panic!()
210    }
211}
212
213/// Storage interface - BasicStorage is some kind of "file" storage.
214///
215/// read and write methods take a start which is a byte offset in the underlying file.
216pub trait BasicStorage: Send {
217    /// Get the size of the underlying storage.
218    /// Note : this is valid initially and after a commit but is not defined after write is called.
219    fn size(&self) -> u64;
220
221    /// Read data.
222    fn read(&self, start: u64, data: &mut [u8]);
223
224    /// Write byte slice to storage.
225    fn write(&mut self, start: u64, data: &[u8]);
226
227    /// Write byte Vec.
228    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
229        let len = data.len();
230        let d = Arc::new(data);
231        self.write_data(start, d, 0, len);
232    }
233
234    /// Write Data slice.
235    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
236        self.write(start, &data[off..off + len]);
237    }
238
239    /// Finish write transaction, size is new size of underlying storage.
240    fn commit(&mut self, size: u64);
241
242    /// Write u64.
243    fn write_u64(&mut self, start: u64, value: u64) {
244        self.write(start, &value.to_le_bytes());
245    }
246
247    /// Read u64.
248    fn read_u64(&self, start: u64) -> u64 {
249        let mut bytes = [0; 8];
250        self.read(start, &mut bytes);
251        u64::from_le_bytes(bytes)
252    }
253
254    /// Wait until current writes are complete.
255    fn wait_complete(&self) {}
256}
257
258/// BasicStorage with Sync and clone.
259pub trait Storage: BasicStorage + Sync {
260    /// Clone.
261    fn clone(&self) -> Box<dyn Storage>;
262}
263
264/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
265#[derive(Default)]
266pub struct MemFile {
267    v: Arc<Mutex<Vec<u8>>>,
268}
269
270impl MemFile {
271    /// Get a new (boxed) MemFile.
272    pub fn new() -> Box<Self> {
273        Box::default()
274    }
275}
276
277impl Storage for MemFile {
278    fn clone(&self) -> Box<dyn Storage> {
279        Box::new(Self { v: self.v.clone() })
280    }
281}
282
283impl BasicStorage for MemFile {
284    fn size(&self) -> u64 {
285        let v = self.v.lock().unwrap();
286        v.len() as u64
287    }
288
289    fn read(&self, off: u64, bytes: &mut [u8]) {
290        let off = off as usize;
291        let len = bytes.len();
292        let mut v = self.v.lock().unwrap();
293        if off + len > v.len() {
294            v.resize(off + len, 0);
295        }
296        bytes.copy_from_slice(&v[off..off + len]);
297    }
298
299    fn write(&mut self, off: u64, bytes: &[u8]) {
300        let off = off as usize;
301        let len = bytes.len();
302        let mut v = self.v.lock().unwrap();
303        if off + len > v.len() {
304            v.resize(off + len, 0);
305        }
306        v[off..off + len].copy_from_slice(bytes);
307    }
308
309    fn commit(&mut self, size: u64) {
310        let mut v = self.v.lock().unwrap();
311        v.resize(size as usize, 0);
312    }
313}
314
315use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
316
317struct FileInner {
318    f: fs::File,
319}
320
321impl FileInner {
322    /// Construct from filename.
323    pub fn new(filename: &str) -> Self {
324        Self {
325            f: OpenOptions::new()
326                .read(true)
327                .write(true)
328                .create(true)
329                .truncate(false)
330                .open(filename)
331                .unwrap(),
332        }
333    }
334
335    fn size(&mut self) -> u64 {
336        self.f.seek(SeekFrom::End(0)).unwrap()
337    }
338
339    fn read(&mut self, off: u64, bytes: &mut [u8]) {
340        self.f.seek(SeekFrom::Start(off)).unwrap();
341        let _ = self.f.read(bytes).unwrap();
342    }
343
344    fn write(&mut self, off: u64, bytes: &[u8]) {
345        // The list of operating systems which auto-zero is likely more than this...research is todo.
346        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
347        {
348            let size = self.f.seek(SeekFrom::End(0)).unwrap();
349            if off > size {
350                self.f.set_len(off).unwrap();
351            }
352        }
353        self.f.seek(SeekFrom::Start(off)).unwrap();
354        let _ = self.f.write(bytes).unwrap();
355    }
356
357    fn commit(&mut self, size: u64) {
358        self.f.set_len(size).unwrap();
359        self.f.sync_all().unwrap();
360    }
361}
362
363/// For atomic upd file, if not unix or windows.
364pub struct UpdFileStorage {
365    file: Cell<Option<FileInner>>,
366}
367
368impl UpdFileStorage {
369    /// Construct from filename.
370    pub fn new(filename: &str) -> Box<Self> {
371        Box::new(Self {
372            file: Cell::new(Some(FileInner::new(filename))),
373        })
374    }
375}
376
377impl BasicStorage for UpdFileStorage {
378    fn size(&self) -> u64 {
379        let mut f = self.file.take().unwrap();
380        let result = f.size();
381        self.file.set(Some(f));
382        result
383    }
384    fn read(&self, off: u64, bytes: &mut [u8]) {
385        let mut f = self.file.take().unwrap();
386        f.read(off, bytes);
387        self.file.set(Some(f));
388    }
389
390    fn write(&mut self, off: u64, bytes: &[u8]) {
391        let mut f = self.file.take().unwrap();
392        f.write(off, bytes);
393        self.file.set(Some(f));
394    }
395
396    fn commit(&mut self, size: u64) {
397        let mut f = self.file.take().unwrap();
398        f.commit(size);
399        self.file.set(Some(f));
400    }
401}
402
403/// Simple implementation of [Storage] using [`std::fs::File`].
404pub struct SimpleFileStorage {
405    file: Arc<Mutex<FileInner>>,
406}
407
408impl SimpleFileStorage {
409    /// Construct from filename.
410    pub fn new(filename: &str) -> Box<Self> {
411        Box::new(Self {
412            file: Arc::new(Mutex::new(FileInner::new(filename))),
413        })
414    }
415}
416
417impl Storage for SimpleFileStorage {
418    fn clone(&self) -> Box<dyn Storage> {
419        Box::new(Self {
420            file: self.file.clone(),
421        })
422    }
423}
424
425impl BasicStorage for SimpleFileStorage {
426    fn size(&self) -> u64 {
427        self.file.lock().unwrap().size()
428    }
429
430    fn read(&self, off: u64, bytes: &mut [u8]) {
431        self.file.lock().unwrap().read(off, bytes);
432    }
433
434    fn write(&mut self, off: u64, bytes: &[u8]) {
435        self.file.lock().unwrap().write(off, bytes);
436    }
437
438    fn commit(&mut self, size: u64) {
439        self.file.lock().unwrap().commit(size);
440    }
441}
442
443/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
444pub struct AnyFileStorage {
445    filename: String,
446    files: Arc<Mutex<Vec<FileInner>>>,
447}
448
449impl AnyFileStorage {
450    /// Create new.
451    pub fn new(filename: &str) -> Box<Self> {
452        Box::new(Self {
453            filename: filename.to_owned(),
454            files: Arc::new(Mutex::new(Vec::new())),
455        })
456    }
457
458    fn get_file(&self) -> FileInner {
459        match self.files.lock().unwrap().pop() {
460            Some(f) => f,
461            _ => FileInner::new(&self.filename),
462        }
463    }
464
465    fn put_file(&self, f: FileInner) {
466        self.files.lock().unwrap().push(f);
467    }
468}
469
470impl Storage for AnyFileStorage {
471    fn clone(&self) -> Box<dyn Storage> {
472        Box::new(Self {
473            filename: self.filename.clone(),
474            files: self.files.clone(),
475        })
476    }
477}
478
479impl BasicStorage for AnyFileStorage {
480    fn size(&self) -> u64 {
481        let mut f = self.get_file();
482        let result = f.size();
483        self.put_file(f);
484        result
485    }
486
487    fn read(&self, off: u64, bytes: &mut [u8]) {
488        let mut f = self.get_file();
489        f.read(off, bytes);
490        self.put_file(f);
491    }
492
493    fn write(&mut self, off: u64, bytes: &[u8]) {
494        let mut f = self.get_file();
495        f.write(off, bytes);
496        self.put_file(f);
497    }
498
499    fn commit(&mut self, size: u64) {
500        let mut f = self.get_file();
501        f.commit(size);
502        self.put_file(f);
503    }
504}
505
506/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
507pub struct DummyFile {}
508impl DummyFile {
509    /// Construct.
510    pub fn new() -> Box<Self> {
511        Box::new(Self {})
512    }
513}
514
515impl Storage for DummyFile {
516    fn clone(&self) -> Box<dyn Storage> {
517        Self::new()
518    }
519}
520
521impl BasicStorage for DummyFile {
522    fn size(&self) -> u64 {
523        0
524    }
525
526    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
527
528    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
529
530    fn commit(&mut self, _size: u64) {}
531}
532
533/// Memory configuration limits for [`AtomicFile`].
534#[non_exhaustive]
535pub struct Limits {
536    /// Limit on size of commit write map, default is 5000.
537    pub map_lim: usize,
538    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
539    pub rbuf_mem: usize,
540    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
541    pub swbuf: usize,
542    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
543    pub uwbuf: usize,
544}
545
546impl Default for Limits {
547    fn default() -> Self {
548        Self {
549            map_lim: 5000,
550            rbuf_mem: 0x200000,
551            swbuf: 0x100000,
552            uwbuf: 0x100000,
553        }
554    }
555}
556
557/// Write Buffer.
558struct WriteBuffer {
559    /// Current write index into buf.
560    ix: usize,
561    /// Current file position.
562    pos: u64,
563    /// Underlying storage.
564    pub stg: Box<dyn BasicStorage>,
565    /// Buffer.
566    buf: Vec<u8>,
567}
568
569impl WriteBuffer {
570    /// Construct.
571    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
572        Self {
573            ix: 0,
574            pos: u64::MAX,
575            stg,
576            buf: vec![0; buf_size],
577        }
578    }
579
580    /// Write data to specified offset,
581    pub fn write(&mut self, off: u64, data: &[u8]) {
582        if self.pos + self.ix as u64 != off {
583            self.flush(off);
584        }
585        let mut done: usize = 0;
586        let mut todo: usize = data.len();
587        while todo > 0 {
588            let mut n: usize = self.buf.len() - self.ix;
589            if n == 0 {
590                self.flush(off + done as u64);
591                n = self.buf.len();
592            }
593            if n > todo {
594                n = todo;
595            }
596            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
597            todo -= n;
598            done += n;
599            self.ix += n;
600        }
601    }
602
603    fn flush(&mut self, new_pos: u64) {
604        if self.ix > 0 {
605            self.stg.write(self.pos, &self.buf[0..self.ix]);
606        }
607        self.ix = 0;
608        self.pos = new_pos;
609    }
610
611    /// Commit.
612    pub fn commit(&mut self, size: u64) {
613        self.flush(u64::MAX);
614        self.stg.commit(size);
615    }
616
617    /// Write u64.
618    pub fn write_u64(&mut self, start: u64, value: u64) {
619        self.write(start, &value.to_le_bytes());
620    }
621}
622
623/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
624///
625/// See implementation of AtomicFile for how this is used in conjunction with WMap.
626///
627/// N is buffer size.
628struct ReadBufStg<const N: usize> {
629    /// Underlying storage.
630    stg: Box<dyn Storage>,
631    /// Buffers.
632    buf: Mutex<ReadBuffer<N>>,
633    /// Read size that is considered small.
634    limit: usize,
635}
636
637impl<const N: usize> Drop for ReadBufStg<N> {
638    fn drop(&mut self) {
639        self.reset();
640    }
641}
642
643impl<const N: usize> ReadBufStg<N> {
644    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
645    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
646        Self {
647            stg,
648            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
649            limit,
650        }
651    }
652
653    /// Clears the buffers.
654    fn reset(&mut self) {
655        self.buf.lock().unwrap().reset();
656    }
657}
658
659impl<const N: usize> BasicStorage for ReadBufStg<N> {
660    /// Read data from storage.
661    fn read(&self, start: u64, data: &mut [u8]) {
662        if data.len() <= self.limit {
663            self.buf.lock().unwrap().read(&*self.stg, start, data);
664        } else {
665            self.stg.read(start, data);
666        }
667    }
668
669    /// Panics.
670    fn size(&self) -> u64 {
671        panic!()
672    }
673
674    /// Panics.
675    fn write(&mut self, _start: u64, _data: &[u8]) {
676        panic!();
677    }
678
679    /// Panics.
680    fn commit(&mut self, _size: u64) {
681        panic!();
682    }
683}
684
685struct ReadBuffer<const N: usize> {
686    /// Maps sector mumbers cached buffers.
687    map: HashMap<u64, Box<[u8; N]>>,
688    /// Maximum number of buffers.
689    max_buf: usize,
690}
691
692impl<const N: usize> ReadBuffer<N> {
693    fn new(max_buf: usize) -> Self {
694        Self {
695            map: HashMap::default(),
696            max_buf,
697        }
698    }
699
700    fn reset(&mut self) {
701        self.map.clear();
702    }
703
704    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
705        let mut done = 0;
706        while done < data.len() {
707            let off = off + done as u64;
708            let sector = off / N as u64;
709            let disp = (off % N as u64) as usize;
710            let amount = min(data.len() - done, N - disp);
711
712            let p = self.map.entry(sector).or_insert_with(|| {
713                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
714                stg.read(sector * N as u64, &mut *p);
715                p
716            });
717            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
718            done += amount;
719        }
720        if self.map.len() >= self.max_buf {
721            self.reset();
722        }
723    }
724}
725
726#[derive(Default)]
727/// Slice of Data to be written to storage.
728struct DataSlice {
729    /// Slice data.
730    pub data: Data,
731    /// Start of slice.
732    pub off: usize,
733    /// Length of slice.
734    pub len: usize,
735}
736
737impl DataSlice {
738    /// Get reference to the whole slice.
739    pub fn all(&self) -> &[u8] {
740        &self.data[self.off..self.off + self.len]
741    }
742    /// Get reference to part of slice.
743    pub fn part(&self, off: usize, len: usize) -> &[u8] {
744        &self.data[self.off + off..self.off + off + len]
745    }
746    /// Trim specified amount from start of slice.
747    pub fn trim(&mut self, trim: usize) {
748        self.off += trim;
749        self.len -= trim;
750    }
751    /// Take the data.
752    #[allow(dead_code)]
753    pub fn take(&mut self) -> Data {
754        std::mem::take(&mut self.data)
755    }
756}
757
758#[derive(Default)]
759/// Updateable store based on some underlying storage.
760struct WMap {
761    /// Map of writes. Key is the end of the slice.
762    map: BTreeMap<u64, DataSlice>,
763}
764
765impl WMap {
766    /// Is the map empty?
767    pub fn is_empty(&self) -> bool {
768        self.map.is_empty()
769    }
770
771    /// Number of key-value pairs in the map.
772    pub fn len(&self) -> usize {
773        self.map.len()
774    }
775
776    /// Take the map and convert it to a Vec.
777    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
778        let map = std::mem::take(&mut self.map);
779        let mut result = GVec::with_capacity(map.len());
780        for (end, v) in map {
781            let start = end - v.len as u64;
782            result.push((start, v));
783        }
784        result
785    }
786
787    /// Write the map into storage.
788    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
789        for (end, v) in self.map.iter() {
790            let start = end - v.len as u64;
791            stg.write_data(start, v.data.clone(), v.off, v.len);
792        }
793    }
794
795    #[cfg(not(feature = "pstd"))]
796    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
797    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
798        if len != 0 {
799            let (mut insert, mut remove) = (Vec::new(), Vec::new());
800            let end = start + len as u64;
801            for (ee, v) in self.map.range_mut(start + 1..) {
802                let ee = *ee;
803                let es = ee - v.len as u64; // Existing write Start.
804                if es >= end {
805                    // Existing write starts after end of new write, nothing to do.
806                    break;
807                } else if start <= es {
808                    if end < ee {
809                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
810                        v.trim((end - es) as usize);
811                        break;
812                    }
813                    // New write subsumes existing write entirely, remove existing write.
814                    remove.push(ee);
815                } else if end < ee {
816                    // New write starts in middle of existing write, ends before end of existing write,
817                    // put start of existing write in insert list, trim existing write.
818                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
819                    v.trim((end - es) as usize);
820                    break;
821                } else {
822                    // New write starts in middle of existing write, ends after existing write,
823                    // put start of existing write in insert list, remove existing write.
824                    insert.push((es, v.take(), v.off, (start - es) as usize));
825                    remove.push(ee);
826                }
827            }
828            for end in remove {
829                self.map.remove(&end);
830            }
831            for (start, data, off, len) in insert {
832                self.map
833                    .insert(start + len as u64, DataSlice { data, off, len });
834            }
835            self.map
836                .insert(start + len as u64, DataSlice { data, off, len });
837        }
838    }
839
840    #[cfg(feature = "pstd")]
841    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
842    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
843        if len != 0 {
844            let end = start + len as u64;
845            let mut c = self
846                .map
847                .lower_bound_mut(std::ops::Bound::Excluded(&start))
848                .with_mutable_key();
849            while let Some((eend, v)) = c.next() {
850                let ee = *eend;
851                let es = ee - v.len as u64; // Existing write Start.
852                if es >= end {
853                    // Existing write starts after end of new write, nothing to do.
854                    c.prev();
855                    break;
856                } else if start <= es {
857                    if end < ee {
858                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
859                        v.trim((end - es) as usize);
860                        c.prev();
861                        break;
862                    }
863                    // New write subsumes existing write entirely, remove existing write.
864                    c.remove_prev();
865                } else if end < ee {
866                    // New write starts in middle of existing write, ends before end of existing write,
867                    // trim existing write, insert start of existing write.
868                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
869                    v.trim((end - es) as usize);
870                    c.prev();
871                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
872                    break;
873                } else {
874                    // New write starts in middle of existing write, ends after existing write,
875                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
876                    v.len = (start - es) as usize;
877                    *eend = es + v.len as u64;
878                }
879            }
880            // Insert the new write.
881            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
882        }
883    }
884
885    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
886    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
887        let len = data.len();
888        if len != 0 {
889            let mut done = 0;
890            for (&end, v) in self.map.range(start + 1..) {
891                let es = end - v.len as u64; // Existing write Start.
892                let doff = start + done as u64;
893                if es > doff {
894                    // Read from underlying storage.
895                    let a = min(len - done, (es - doff) as usize);
896                    u.read(doff, &mut data[done..done + a]);
897                    done += a;
898                    if done == len {
899                        return;
900                    }
901                }
902                // Use existing write.
903                let skip = (start + done as u64 - es) as usize;
904                let a = min(len - done, v.len - skip);
905                data[done..done + a].copy_from_slice(v.part(skip, a));
906                done += a;
907                if done == len {
908                    return;
909                }
910            }
911            u.read(start + done as u64, &mut data[done..]);
912        }
913    }
914}
915
916/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
917pub struct BasicAtomicFile {
918    /// The main underlying storage.
919    stg: WriteBuffer,
920    /// Temporary storage for updates during commit.
921    upd: WriteBuffer,
922    /// Map of writes.
923    map: WMap,
924    /// List of writes.
925    list: GVec<(u64, DataSlice)>,
926    size: u64,
927}
928
929impl BasicAtomicFile {
930    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
931    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
932        let size = stg.size();
933        let mut result = Box::new(Self {
934            stg: WriteBuffer::new(stg, lim.swbuf),
935            upd: WriteBuffer::new(upd, lim.uwbuf),
936            map: WMap::default(),
937            list: GVec::new(),
938            size,
939        });
940        result.init();
941        result
942    }
943
944    /// Apply outstanding updates.
945    fn init(&mut self) {
946        let end = self.upd.stg.read_u64(0);
947        let size = self.upd.stg.read_u64(8);
948        if end == 0 {
949            return;
950        }
951        assert!(end == self.upd.stg.size());
952        let mut pos = 16;
953        while pos < end {
954            let start = self.upd.stg.read_u64(pos);
955            pos += 8;
956            let len = self.upd.stg.read_u64(pos);
957            pos += 8;
958            let mut buf: GVec<u8> = gvec![0; len as usize];
959            self.upd.stg.read(pos, &mut buf);
960            pos += len;
961            self.stg.write(start, &buf);
962        }
963        self.stg.commit(size);
964        self.upd.commit(0);
965    }
966
967    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
968    pub fn commit_phase(&mut self, size: u64, phase: u8) {
969        if self.map.is_empty() && self.list.is_empty() {
970            return;
971        }
972        if phase == 1 {
973            self.list = self.map.convert_to_vec();
974
975            // Write the updates to upd.
976            // First set the end position to zero.
977            self.upd.write_u64(0, 0);
978            self.upd.write_u64(8, size);
979            self.upd.commit(16); // Not clear if this is necessary.
980
981            // Write the update records.
982            let mut stg_written = false;
983            let mut pos: u64 = 16;
984            for (start, v) in self.list.iter() {
985                let (start, len, data) = (*start, v.len as u64, v.all());
986                if start >= self.size {
987                    // Writes beyond current stg size can be written directly.
988                    stg_written = true;
989                    self.stg.write(start, data);
990                } else {
991                    self.upd.write_u64(pos, start);
992                    pos += 8;
993                    self.upd.write_u64(pos, len);
994                    pos += 8;
995                    self.upd.write(pos, data);
996                    pos += len;
997                }
998            }
999            if stg_written {
1000                self.stg.commit(size);
1001            }
1002            self.upd.commit(pos); // Not clear if this is necessary.
1003
1004            // Set the end position.
1005            self.upd.write_u64(0, pos);
1006            self.upd.write_u64(8, size);
1007            self.upd.commit(pos);
1008        } else {
1009            for (start, v) in self.list.iter() {
1010                if *start < self.size {
1011                    // Writes beyond current stg size have already been written.
1012                    self.stg.write(*start, v.all());
1013                }
1014            }
1015            self.list = GVec::new();
1016            self.stg.commit(size);
1017            self.upd.commit(0);
1018        }
1019    }
1020}
1021
1022impl BasicStorage for BasicAtomicFile {
1023    fn commit(&mut self, size: u64) {
1024        self.commit_phase(size, 1);
1025        self.commit_phase(size, 2);
1026        self.size = size;
1027    }
1028
1029    fn size(&self) -> u64 {
1030        self.size
1031    }
1032
1033    fn read(&self, start: u64, data: &mut [u8]) {
1034        self.map.read(start, data, &*self.stg.stg);
1035    }
1036
1037    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1038        self.map.write(start, data, off, len);
1039    }
1040
1041    fn write(&mut self, start: u64, data: &[u8]) {
1042        let len = data.len();
1043        let d = Arc::new(data.to_vec());
1044        self.write_data(start, d, 0, len);
1045    }
1046}
1047
1048/// Optimized implementation of [Storage] ( unix only ).
1049#[cfg(target_family = "unix")]
1050pub struct UnixFileStorage {
1051    size: Arc<Mutex<u64>>,
1052    f: fs::File,
1053}
1054#[cfg(target_family = "unix")]
1055impl UnixFileStorage {
1056    /// Construct from filename.
1057    pub fn new(filename: &str) -> Box<Self> {
1058        let mut f = OpenOptions::new()
1059            .read(true)
1060            .write(true)
1061            .create(true)
1062            .truncate(false)
1063            .open(filename)
1064            .unwrap();
1065        let size = f.seek(SeekFrom::End(0)).unwrap();
1066        let size = Arc::new(Mutex::new(size));
1067        Box::new(Self { size, f })
1068    }
1069}
1070
1071#[cfg(target_family = "unix")]
1072impl Storage for UnixFileStorage {
1073    fn clone(&self) -> Box<dyn Storage> {
1074        Box::new(Self {
1075            size: self.size.clone(),
1076            f: self.f.try_clone().unwrap(),
1077        })
1078    }
1079}
1080
1081#[cfg(target_family = "unix")]
1082use std::os::unix::fs::FileExt;
1083
1084#[cfg(target_family = "unix")]
1085impl BasicStorage for UnixFileStorage {
1086    fn read(&self, start: u64, data: &mut [u8]) {
1087        let _ = self.f.read_at(data, start);
1088    }
1089
1090    fn write(&mut self, start: u64, data: &[u8]) {
1091        let _ = self.f.write_at(data, start);
1092    }
1093
1094    fn size(&self) -> u64 {
1095        *self.size.lock().unwrap()
1096    }
1097
1098    fn commit(&mut self, size: u64) {
1099        *self.size.lock().unwrap() = size;
1100        self.f.set_len(size).unwrap();
1101        self.f.sync_all().unwrap();
1102    }
1103}
1104
1105/// Optimized implementation of [Storage] ( windows only ).
1106#[cfg(target_family = "windows")]
1107pub struct WindowsFileStorage {
1108    size: Arc<Mutex<u64>>,
1109    f: fs::File,
1110}
1111#[cfg(target_family = "windows")]
1112impl WindowsFileStorage {
1113    /// Construct from filename.
1114    pub fn new(filename: &str) -> Box<Self> {
1115        let mut f = OpenOptions::new()
1116            .read(true)
1117            .write(true)
1118            .create(true)
1119            .truncate(false)
1120            .open(filename)
1121            .unwrap();
1122        let size = f.seek(SeekFrom::End(0)).unwrap();
1123        let size = Arc::new(Mutex::new(size));
1124        Box::new(Self { size, f })
1125    }
1126}
1127
1128#[cfg(target_family = "windows")]
1129impl Storage for WindowsFileStorage {
1130    fn clone(&self) -> Box<dyn Storage> {
1131        Box::new(Self {
1132            size: self.size.clone(),
1133            f: self.f.try_clone().unwrap(),
1134        })
1135    }
1136}
1137
1138#[cfg(target_family = "windows")]
1139use std::os::windows::fs::FileExt;
1140
1141#[cfg(target_family = "windows")]
1142impl BasicStorage for WindowsFileStorage {
1143    fn read(&self, start: u64, data: &mut [u8]) {
1144        let _ = self.f.seek_read(data, start);
1145    }
1146
1147    fn write(&mut self, start: u64, data: &[u8]) {
1148        let _ = self.f.seek_write(data, start);
1149    }
1150
1151    fn size(&self) -> u64 {
1152        *self.size.lock().unwrap()
1153    }
1154
1155    fn commit(&mut self, size: u64) {
1156        *self.size.lock().unwrap() = size;
1157        self.f.set_len(size).unwrap();
1158        self.f.sync_all().unwrap();
1159    }
1160}
1161
1162/// Optimised Storage ( varies according to platform ).
1163#[cfg(target_family = "windows")]
1164pub type MultiFileStorage = WindowsFileStorage;
1165
1166/// Optimised Storage ( varies according to platform ).
1167#[cfg(target_family = "unix")]
1168pub type MultiFileStorage = UnixFileStorage;
1169
1170/// Optimised Storage ( varies according to platform ).
1171#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1172pub type MultiFileStorage = AnyFileStorage;
1173
1174/// Fast Storage for upd file ( varies according to platform ).
1175#[cfg(any(target_family = "windows", target_family = "unix"))]
1176pub type FastFileStorage = MultiFileStorage;
1177
1178/// Fast Storage for upd file ( varies according to platform ).
1179#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1180pub type FastFileStorage = UpdFileStorage;
1181
1182#[cfg(test)]
1183/// Get amount of testing from environment variable TA.
1184fn test_amount() -> usize {
1185    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1186}
1187
1188#[test]
1189fn test_atomic_file() {
1190    use rand::Rng;
1191    /* Idea of test is to check AtomicFile and MemFile behave the same */
1192
1193    let ta = test_amount();
1194    println!(" Test amount={}", ta);
1195
1196    let mut rng = rand::thread_rng();
1197
1198    for _ in 0..100 {
1199        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1200        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1201        let mut s2 = MemFile::new();
1202
1203        for _ in 0..1000 * ta {
1204            let off: usize = rng.r#gen::<usize>() % 100;
1205            let mut len = 1 + rng.r#gen::<usize>() % 20;
1206            let w: bool = rng.r#gen();
1207            if w {
1208                let mut bytes = Vec::new();
1209                while len > 0 {
1210                    len -= 1;
1211                    let b: u8 = rng.r#gen::<u8>();
1212                    bytes.push(b);
1213                }
1214                s1.write(off as u64, &bytes);
1215                s2.write(off as u64, &bytes);
1216            } else {
1217                let mut b2 = vec![0; len];
1218                let mut b3 = vec![0; len];
1219                s1.read(off as u64, &mut b2);
1220                s2.read(off as u64, &mut b3);
1221                assert!(b2 == b3);
1222            }
1223            if rng.r#gen::<usize>() % 50 == 0 {
1224                s1.commit(200);
1225                s2.commit(200);
1226            }
1227        }
1228    }
1229}