Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::GTemp,
27    veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39/// ```Arc<Vec<u8>>```
40pub type Data = Arc<Vec<u8>>;
41
42/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
43/// Performs commit asyncronously.
44///
45/// #Example
46///
47/// ```
48/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
49/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
50/// af.write( 0, &[1,2,3,4] );
51/// af.commit(4);
52/// af.wait_complete();
53/// ```
54///
55/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
56/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
57/// the updates to underlying storage have been applied.
58pub struct AtomicFile {
59    /// New updates are written here.
60    map: WMap,
61    /// Underlying file, with previous updates mapped.
62    cf: Arc<RwLock<CommitFile>>,
63    /// File size.
64    size: u64,
65    /// For sending update maps to be saved.
66    tx: std::sync::mpsc::Sender<(u64, WMap)>,
67    /// Held by update process while it is active.
68    busy: Arc<Mutex<()>>,
69    /// Limit on size of CommitFile map.
70    map_lim: usize,
71}
72
73impl AtomicFile {
74    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
75    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76        Self::new_with_limits(stg, upd, &Limits::default())
77    }
78
79    /// Construct Atomic file with specified limits.
80    pub fn new_with_limits(
81        stg: Box<dyn Storage>,
82        upd: Box<dyn BasicStorage>,
83        lim: &Limits,
84    ) -> Box<Self> {
85        let size = stg.size();
86        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
91
92        // Start the thread which does save asyncronously.
93        let (cf1, busy1) = (cf.clone(), busy.clone());
94
95        std::thread::spawn(move || {
96            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
97            while let Ok((size, map)) = rx.recv() {
98                let _lock = busy1.lock();
99                baf.map = map;
100                baf.commit(size);
101                cf1.write().unwrap().done_one();
102            }
103        });
104        Box::new(Self {
105            map: WMap::default(),
106            cf,
107            size,
108            tx,
109            busy,
110            map_lim: lim.map_lim,
111        })
112    }
113}
114
115impl Storage for AtomicFile {
116    fn clone(&self) -> Box<dyn Storage> {
117        panic!()
118    }
119}
120
121impl BasicStorage for AtomicFile {
122    fn commit(&mut self, size: u64) {
123        self.size = size;
124        if self.map.is_empty() {
125            return;
126        }
127        if self.cf.read().unwrap().map.len() > self.map_lim {
128            self.wait_complete();
129        }
130        let map = std::mem::take(&mut self.map);
131        let cf = &mut *self.cf.write().unwrap();
132        if cf.stop {
133            return;
134        } // Do not start new commit on program termination.
135        cf.todo += 1;
136        // Apply map of updates to CommitFile.
137        map.to_storage(cf);
138        // Send map of updates to thread to be written to underlying storage.
139        self.tx.send((size, map)).unwrap();
140    }
141
142    fn size(&self) -> u64 {
143        self.size
144    }
145
146    fn read(&self, start: u64, data: &mut [u8]) {
147        self.map.read(start, data, &*self.cf.read().unwrap());
148    }
149
150    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
151        self.map.write(start, data, off, len);
152    }
153
154    fn write(&mut self, start: u64, data: &[u8]) {
155        let len = data.len();
156        let d = Arc::new(data.to_vec());
157        self.write_data(start, d, 0, len);
158    }
159
160    fn wait_complete(&self) {
161        while self.cf.read().unwrap().busy() {
162            let _x = self.busy.lock();
163        }
164    }
165
166    fn shutdown(&mut self) {
167        self.cf.write().unwrap().stop = true; // Prevents new commits from being added.
168        while self.cf.read().unwrap().todo != 0 {
169            let _x = self.busy.lock();
170        }
171    }
172}
173
174struct CommitFile {
175    /// Buffered underlying storage.
176    stg: ReadBufStg<256>,
177    /// Map of committed updates.
178    map: WMap,
179    /// Number of outstanding unsaved commits.
180    todo: usize,
181    /// Flag to prevent new commits starting
182    stop: bool,
183}
184
185impl CommitFile {
186    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
187        Self {
188            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
189            map: WMap::default(),
190            todo: 0,
191            stop: false,
192        }
193    }
194
195    fn done_one(&mut self) {
196        self.todo -= 1;
197        if self.todo == 0 {
198            self.map = WMap::default();
199            self.stg.reset();
200        }
201    }
202
203    fn busy(&self) -> bool {
204        self.todo != 0 || self.stop
205    }
206}
207
208impl BasicStorage for CommitFile {
209    fn commit(&mut self, _size: u64) {
210        panic!()
211    }
212
213    fn size(&self) -> u64 {
214        panic!()
215    }
216
217    fn read(&self, start: u64, data: &mut [u8]) {
218        self.map.read(start, data, &self.stg);
219    }
220
221    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
222        self.map.write(start, data, off, len);
223    }
224
225    fn write(&mut self, _start: u64, _data: &[u8]) {
226        panic!()
227    }
228}
229
230/// Storage interface - BasicStorage is some kind of "file" storage.
231///
232/// read and write methods take a start which is a byte offset in the underlying file.
233pub trait BasicStorage: Send {
234    /// Get the size of the underlying storage.
235    /// Note : this is valid initially and after a commit but is not defined after write is called.
236    fn size(&self) -> u64;
237
238    /// Read data.
239    fn read(&self, start: u64, data: &mut [u8]);
240
241    /// Write byte slice to storage.
242    fn write(&mut self, start: u64, data: &[u8]);
243
244    /// Write byte Vec.
245    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
246        let len = data.len();
247        let d = Arc::new(data);
248        self.write_data(start, d, 0, len);
249    }
250
251    /// Write Data slice.
252    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
253        self.write(start, &data[off..off + len]);
254    }
255
256    /// Finish write transaction, size is new size of underlying storage.
257    fn commit(&mut self, size: u64);
258
259    /// Write u64.
260    fn write_u64(&mut self, start: u64, value: u64) {
261        self.write(start, &value.to_le_bytes());
262    }
263
264    /// Read u64.
265    fn read_u64(&self, start: u64) -> u64 {
266        let mut bytes = [0; 8];
267        self.read(start, &mut bytes);
268        u64::from_le_bytes(bytes)
269    }
270
271    /// Wait until current writes are complete.
272    fn wait_complete(&self) {}
273
274    /// Called on program termination.
275    fn shutdown(&mut self) {}
276}
277
278/// BasicStorage with Sync and clone.
279pub trait Storage: BasicStorage + Sync {
280    /// Clone.
281    fn clone(&self) -> Box<dyn Storage>;
282}
283
284/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
285#[derive(Default)]
286pub struct MemFile {
287    v: Arc<Mutex<Vec<u8>>>,
288}
289
290impl MemFile {
291    /// Get a new (boxed) MemFile.
292    pub fn new() -> Box<Self> {
293        Box::default()
294    }
295}
296
297impl Storage for MemFile {
298    fn clone(&self) -> Box<dyn Storage> {
299        Box::new(Self { v: self.v.clone() })
300    }
301}
302
303impl BasicStorage for MemFile {
304    fn size(&self) -> u64 {
305        let v = self.v.lock().unwrap();
306        v.len() as u64
307    }
308
309    fn read(&self, off: u64, bytes: &mut [u8]) {
310        let off = off as usize;
311        let len = bytes.len();
312        let mut v = self.v.lock().unwrap();
313        if off + len > v.len() {
314            v.resize(off + len, 0);
315        }
316        bytes.copy_from_slice(&v[off..off + len]);
317    }
318
319    fn write(&mut self, off: u64, bytes: &[u8]) {
320        let off = off as usize;
321        let len = bytes.len();
322        let mut v = self.v.lock().unwrap();
323        if off + len > v.len() {
324            v.resize(off + len, 0);
325        }
326        v[off..off + len].copy_from_slice(bytes);
327    }
328
329    fn commit(&mut self, size: u64) {
330        let mut v = self.v.lock().unwrap();
331        v.resize(size as usize, 0);
332    }
333}
334
335use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
336
337struct FileInner {
338    f: fs::File,
339}
340
341impl FileInner {
342    /// Construct from filename.
343    pub fn new(filename: &str) -> Self {
344        Self {
345            f: OpenOptions::new()
346                .read(true)
347                .write(true)
348                .create(true)
349                .truncate(false)
350                .open(filename)
351                .unwrap(),
352        }
353    }
354
355    fn size(&mut self) -> u64 {
356        self.f.seek(SeekFrom::End(0)).unwrap()
357    }
358
359    fn read(&mut self, off: u64, bytes: &mut [u8]) {
360        self.f.seek(SeekFrom::Start(off)).unwrap();
361        let _ = self.f.read(bytes).unwrap();
362    }
363
364    fn write(&mut self, off: u64, bytes: &[u8]) {
365        // The list of operating systems which auto-zero is likely more than this...research is todo.
366        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
367        {
368            let size = self.f.seek(SeekFrom::End(0)).unwrap();
369            if off > size {
370                self.f.set_len(off).unwrap();
371            }
372        }
373        self.f.seek(SeekFrom::Start(off)).unwrap();
374        let _ = self.f.write(bytes).unwrap();
375    }
376
377    fn commit(&mut self, size: u64) {
378        self.f.set_len(size).unwrap();
379        self.f.sync_all().unwrap();
380    }
381}
382
383/// For atomic upd file, if not unix or windows.
384pub struct UpdFileStorage {
385    file: Cell<Option<FileInner>>,
386}
387
388impl UpdFileStorage {
389    /// Construct from filename.
390    pub fn new(filename: &str) -> Box<Self> {
391        Box::new(Self {
392            file: Cell::new(Some(FileInner::new(filename))),
393        })
394    }
395}
396
397impl BasicStorage for UpdFileStorage {
398    fn size(&self) -> u64 {
399        let mut f = self.file.take().unwrap();
400        let result = f.size();
401        self.file.set(Some(f));
402        result
403    }
404    fn read(&self, off: u64, bytes: &mut [u8]) {
405        let mut f = self.file.take().unwrap();
406        f.read(off, bytes);
407        self.file.set(Some(f));
408    }
409
410    fn write(&mut self, off: u64, bytes: &[u8]) {
411        let mut f = self.file.take().unwrap();
412        f.write(off, bytes);
413        self.file.set(Some(f));
414    }
415
416    fn commit(&mut self, size: u64) {
417        let mut f = self.file.take().unwrap();
418        f.commit(size);
419        self.file.set(Some(f));
420    }
421}
422
423/// Simple implementation of [Storage] using [`std::fs::File`].
424pub struct SimpleFileStorage {
425    file: Arc<Mutex<FileInner>>,
426}
427
428impl SimpleFileStorage {
429    /// Construct from filename.
430    pub fn new(filename: &str) -> Box<Self> {
431        Box::new(Self {
432            file: Arc::new(Mutex::new(FileInner::new(filename))),
433        })
434    }
435}
436
437impl Storage for SimpleFileStorage {
438    fn clone(&self) -> Box<dyn Storage> {
439        Box::new(Self {
440            file: self.file.clone(),
441        })
442    }
443}
444
445impl BasicStorage for SimpleFileStorage {
446    fn size(&self) -> u64 {
447        self.file.lock().unwrap().size()
448    }
449
450    fn read(&self, off: u64, bytes: &mut [u8]) {
451        self.file.lock().unwrap().read(off, bytes);
452    }
453
454    fn write(&mut self, off: u64, bytes: &[u8]) {
455        self.file.lock().unwrap().write(off, bytes);
456    }
457
458    fn commit(&mut self, size: u64) {
459        self.file.lock().unwrap().commit(size);
460    }
461}
462
463/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
464pub struct AnyFileStorage {
465    filename: String,
466    files: Arc<Mutex<Vec<FileInner>>>,
467}
468
469impl AnyFileStorage {
470    /// Create new.
471    pub fn new(filename: &str) -> Box<Self> {
472        Box::new(Self {
473            filename: filename.to_owned(),
474            files: Arc::new(Mutex::new(Vec::new())),
475        })
476    }
477
478    fn get_file(&self) -> FileInner {
479        match self.files.lock().unwrap().pop() {
480            Some(f) => f,
481            _ => FileInner::new(&self.filename),
482        }
483    }
484
485    fn put_file(&self, f: FileInner) {
486        self.files.lock().unwrap().push(f);
487    }
488}
489
490impl Storage for AnyFileStorage {
491    fn clone(&self) -> Box<dyn Storage> {
492        Box::new(Self {
493            filename: self.filename.clone(),
494            files: self.files.clone(),
495        })
496    }
497}
498
499impl BasicStorage for AnyFileStorage {
500    fn size(&self) -> u64 {
501        let mut f = self.get_file();
502        let result = f.size();
503        self.put_file(f);
504        result
505    }
506
507    fn read(&self, off: u64, bytes: &mut [u8]) {
508        let mut f = self.get_file();
509        f.read(off, bytes);
510        self.put_file(f);
511    }
512
513    fn write(&mut self, off: u64, bytes: &[u8]) {
514        let mut f = self.get_file();
515        f.write(off, bytes);
516        self.put_file(f);
517    }
518
519    fn commit(&mut self, size: u64) {
520        let mut f = self.get_file();
521        f.commit(size);
522        self.put_file(f);
523    }
524}
525
526/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
527pub struct DummyFile {}
528impl DummyFile {
529    /// Construct.
530    pub fn new() -> Box<Self> {
531        Box::new(Self {})
532    }
533}
534
535impl Storage for DummyFile {
536    fn clone(&self) -> Box<dyn Storage> {
537        Self::new()
538    }
539}
540
541impl BasicStorage for DummyFile {
542    fn size(&self) -> u64 {
543        0
544    }
545
546    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
547
548    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
549
550    fn commit(&mut self, _size: u64) {}
551}
552
553/// Memory configuration limits for [`AtomicFile`].
554#[non_exhaustive]
555pub struct Limits {
556    /// Limit on size of commit write map, default is 5000.
557    pub map_lim: usize,
558    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
559    pub rbuf_mem: usize,
560    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
561    pub swbuf: usize,
562    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
563    pub uwbuf: usize,
564}
565
566impl Default for Limits {
567    fn default() -> Self {
568        Self {
569            map_lim: 5000,
570            rbuf_mem: 0x200000,
571            swbuf: 0x100000,
572            uwbuf: 0x100000,
573        }
574    }
575}
576
577/// Write Buffer.
578struct WriteBuffer {
579    /// Current write index into buf.
580    ix: usize,
581    /// Current file position.
582    pos: u64,
583    /// Underlying storage.
584    pub stg: Box<dyn BasicStorage>,
585    /// Buffer.
586    buf: Vec<u8>,
587}
588
589impl WriteBuffer {
590    /// Construct.
591    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
592        Self {
593            ix: 0,
594            pos: u64::MAX,
595            stg,
596            buf: vec![0; buf_size],
597        }
598    }
599
600    /// Write data to specified offset,
601    pub fn write(&mut self, off: u64, data: &[u8]) {
602        if self.pos + self.ix as u64 != off {
603            self.flush(off);
604        }
605        let mut done: usize = 0;
606        let mut todo: usize = data.len();
607        while todo > 0 {
608            let mut n: usize = self.buf.len() - self.ix;
609            if n == 0 {
610                self.flush(off + done as u64);
611                n = self.buf.len();
612            }
613            if n > todo {
614                n = todo;
615            }
616            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
617            todo -= n;
618            done += n;
619            self.ix += n;
620        }
621    }
622
623    fn flush(&mut self, new_pos: u64) {
624        if self.ix > 0 {
625            self.stg.write(self.pos, &self.buf[0..self.ix]);
626        }
627        self.ix = 0;
628        self.pos = new_pos;
629    }
630
631    /// Commit.
632    pub fn commit(&mut self, size: u64) {
633        self.flush(u64::MAX);
634        self.stg.commit(size);
635    }
636
637    /// Write u64.
638    pub fn write_u64(&mut self, start: u64, value: u64) {
639        self.write(start, &value.to_le_bytes());
640    }
641}
642
643/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
644///
645/// See implementation of AtomicFile for how this is used in conjunction with WMap.
646///
647/// N is buffer size.
648struct ReadBufStg<const N: usize> {
649    /// Underlying storage.
650    stg: Box<dyn Storage>,
651    /// Buffers.
652    buf: Mutex<ReadBuffer<N>>,
653    /// Read size that is considered small.
654    limit: usize,
655}
656
657impl<const N: usize> Drop for ReadBufStg<N> {
658    fn drop(&mut self) {
659        self.reset();
660    }
661}
662
663impl<const N: usize> ReadBufStg<N> {
664    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
665    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
666        Self {
667            stg,
668            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
669            limit,
670        }
671    }
672
673    /// Clears the buffers.
674    fn reset(&mut self) {
675        self.buf.lock().unwrap().reset();
676    }
677}
678
679impl<const N: usize> BasicStorage for ReadBufStg<N> {
680    /// Read data from storage.
681    fn read(&self, start: u64, data: &mut [u8]) {
682        if data.len() <= self.limit {
683            self.buf.lock().unwrap().read(&*self.stg, start, data);
684        } else {
685            self.stg.read(start, data);
686        }
687    }
688
689    /// Panics.
690    fn size(&self) -> u64 {
691        panic!()
692    }
693
694    /// Panics.
695    fn write(&mut self, _start: u64, _data: &[u8]) {
696        panic!();
697    }
698
699    /// Panics.
700    fn commit(&mut self, _size: u64) {
701        panic!();
702    }
703}
704
705struct ReadBuffer<const N: usize> {
706    /// Maps sector mumbers cached buffers.
707    map: HashMap<u64, Box<[u8; N]>>,
708    /// Maximum number of buffers.
709    max_buf: usize,
710}
711
712impl<const N: usize> ReadBuffer<N> {
713    fn new(max_buf: usize) -> Self {
714        Self {
715            map: HashMap::default(),
716            max_buf,
717        }
718    }
719
720    fn reset(&mut self) {
721        self.map.clear();
722    }
723
724    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
725        let mut done = 0;
726        while done < data.len() {
727            let off = off + done as u64;
728            let sector = off / N as u64;
729            let disp = (off % N as u64) as usize;
730            let amount = min(data.len() - done, N - disp);
731
732            let p = self.map.entry(sector).or_insert_with(|| {
733                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
734                stg.read(sector * N as u64, &mut *p);
735                p
736            });
737            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
738            done += amount;
739        }
740        if self.map.len() >= self.max_buf {
741            self.reset();
742        }
743    }
744}
745
746#[derive(Default)]
747/// Slice of Data to be written to storage.
748struct DataSlice {
749    /// Slice data.
750    pub data: Data,
751    /// Start of slice.
752    pub off: usize,
753    /// Length of slice.
754    pub len: usize,
755}
756
757impl DataSlice {
758    /// Get reference to the whole slice.
759    pub fn all(&self) -> &[u8] {
760        &self.data[self.off..self.off + self.len]
761    }
762    /// Get reference to part of slice.
763    pub fn part(&self, off: usize, len: usize) -> &[u8] {
764        &self.data[self.off + off..self.off + off + len]
765    }
766    /// Trim specified amount from start of slice.
767    pub fn trim(&mut self, trim: usize) {
768        self.off += trim;
769        self.len -= trim;
770    }
771    /// Take the data.
772    #[allow(dead_code)]
773    pub fn take(&mut self) -> Data {
774        std::mem::take(&mut self.data)
775    }
776}
777
778#[derive(Default)]
779/// Updateable store based on some underlying storage.
780struct WMap {
781    /// Map of writes. Key is the end of the slice.
782    map: BTreeMap<u64, DataSlice>,
783}
784
785impl WMap {
786    /// Is the map empty?
787    pub fn is_empty(&self) -> bool {
788        self.map.is_empty()
789    }
790
791    /// Number of key-value pairs in the map.
792    pub fn len(&self) -> usize {
793        self.map.len()
794    }
795
796    /// Take the map and convert it to a Vec.
797    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
798        let map = std::mem::take(&mut self.map);
799        let mut result = GVec::with_capacity(map.len());
800        for (end, v) in map {
801            let start = end - v.len as u64;
802            result.push((start, v));
803        }
804        result
805    }
806
807    /// Write the map into storage.
808    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
809        for (end, v) in self.map.iter() {
810            let start = end - v.len as u64;
811            stg.write_data(start, v.data.clone(), v.off, v.len);
812        }
813    }
814
815    #[cfg(not(feature = "pstd"))]
816    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
817    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
818        if len != 0 {
819            let (mut insert, mut remove) = (Vec::new(), Vec::new());
820            let end = start + len as u64;
821            for (ee, v) in self.map.range_mut(start + 1..) {
822                let ee = *ee;
823                let es = ee - v.len as u64; // Existing write Start.
824                if es >= end {
825                    // Existing write starts after end of new write, nothing to do.
826                    break;
827                } else if start <= es {
828                    if end < ee {
829                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
830                        v.trim((end - es) as usize);
831                        break;
832                    }
833                    // New write subsumes existing write entirely, remove existing write.
834                    remove.push(ee);
835                } else if end < ee {
836                    // New write starts in middle of existing write, ends before end of existing write,
837                    // put start of existing write in insert list, trim existing write.
838                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
839                    v.trim((end - es) as usize);
840                    break;
841                } else {
842                    // New write starts in middle of existing write, ends after existing write,
843                    // put start of existing write in insert list, remove existing write.
844                    insert.push((es, v.take(), v.off, (start - es) as usize));
845                    remove.push(ee);
846                }
847            }
848            for end in remove {
849                self.map.remove(&end);
850            }
851            for (start, data, off, len) in insert {
852                self.map
853                    .insert(start + len as u64, DataSlice { data, off, len });
854            }
855            self.map
856                .insert(start + len as u64, DataSlice { data, off, len });
857        }
858    }
859
860    #[cfg(feature = "pstd")]
861    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
862    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
863        if len != 0 {
864            let end = start + len as u64;
865            let mut c = self
866                .map
867                .lower_bound_mut(std::ops::Bound::Excluded(&start))
868                .with_mutable_key();
869            while let Some((eend, v)) = c.next() {
870                let ee = *eend;
871                let es = ee - v.len as u64; // Existing write Start.
872                if es >= end {
873                    // Existing write starts after end of new write, nothing to do.
874                    c.prev();
875                    break;
876                } else if start <= es {
877                    if end < ee {
878                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
879                        v.trim((end - es) as usize);
880                        c.prev();
881                        break;
882                    }
883                    // New write subsumes existing write entirely, remove existing write.
884                    c.remove_prev();
885                } else if end < ee {
886                    // New write starts in middle of existing write, ends before end of existing write,
887                    // trim existing write, insert start of existing write.
888                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
889                    v.trim((end - es) as usize);
890                    c.prev();
891                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
892                    break;
893                } else {
894                    // New write starts in middle of existing write, ends after existing write,
895                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
896                    v.len = (start - es) as usize;
897                    *eend = es + v.len as u64;
898                }
899            }
900            // Insert the new write.
901            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
902        }
903    }
904
905    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
906    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
907        let len = data.len();
908        if len != 0 {
909            let mut done = 0;
910            for (&end, v) in self.map.range(start + 1..) {
911                let es = end - v.len as u64; // Existing write Start.
912                let doff = start + done as u64;
913                if es > doff {
914                    // Read from underlying storage.
915                    let a = min(len - done, (es - doff) as usize);
916                    u.read(doff, &mut data[done..done + a]);
917                    done += a;
918                    if done == len {
919                        return;
920                    }
921                }
922                // Use existing write.
923                let skip = (start + done as u64 - es) as usize;
924                let a = min(len - done, v.len - skip);
925                data[done..done + a].copy_from_slice(v.part(skip, a));
926                done += a;
927                if done == len {
928                    return;
929                }
930            }
931            u.read(start + done as u64, &mut data[done..]);
932        }
933    }
934}
935
936/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
937pub struct BasicAtomicFile {
938    /// The main underlying storage.
939    stg: WriteBuffer,
940    /// Temporary storage for updates during commit.
941    upd: WriteBuffer,
942    /// Map of writes.
943    map: WMap,
944    /// List of writes.
945    list: GVec<(u64, DataSlice)>,
946    size: u64,
947    stop: bool,
948}
949
950impl BasicAtomicFile {
951    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
952    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
953        let size = stg.size();
954        let mut result = Box::new(Self {
955            stg: WriteBuffer::new(stg, lim.swbuf),
956            upd: WriteBuffer::new(upd, lim.uwbuf),
957            map: WMap::default(),
958            list: GVec::new(),
959            size,
960            stop: false,
961        });
962        result.init();
963        result
964    }
965
966    /// Apply outstanding updates.
967    fn init(&mut self) {
968        let end = self.upd.stg.read_u64(0);
969        let size = self.upd.stg.read_u64(8);
970        if end == 0 {
971            return;
972        }
973        assert!(end == self.upd.stg.size());
974        let mut pos = 16;
975        while pos < end {
976            let start = self.upd.stg.read_u64(pos);
977            pos += 8;
978            let len = self.upd.stg.read_u64(pos);
979            pos += 8;
980            let mut buf: GVec<u8> = gvec![0; len as usize];
981            self.upd.stg.read(pos, &mut buf);
982            pos += len;
983            self.stg.write(start, &buf);
984        }
985        self.stg.commit(size);
986        self.upd.commit(0);
987    }
988
989    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
990    pub fn commit_phase(&mut self, size: u64, phase: u8) {
991        if self.map.is_empty() && self.list.is_empty() {
992            return;
993        }
994        if phase == 1 {
995            self.list = self.map.convert_to_vec();
996
997            // Write the updates to upd.
998            // First set the end position to zero.
999            self.upd.write_u64(0, 0);
1000            self.upd.write_u64(8, size);
1001            self.upd.commit(16); // Not clear if this is necessary.
1002
1003            // Write the update records.
1004            let mut stg_written = false;
1005            let mut pos: u64 = 16;
1006            for (start, v) in self.list.iter() {
1007                let (start, len, data) = (*start, v.len as u64, v.all());
1008                if start >= self.size {
1009                    // Writes beyond current stg size can be written directly.
1010                    stg_written = true;
1011                    self.stg.write(start, data);
1012                } else {
1013                    self.upd.write_u64(pos, start);
1014                    pos += 8;
1015                    self.upd.write_u64(pos, len);
1016                    pos += 8;
1017                    self.upd.write(pos, data);
1018                    pos += len;
1019                }
1020            }
1021            if stg_written {
1022                self.stg.commit(size);
1023            }
1024            self.upd.commit(pos); // Not clear if this is necessary.
1025
1026            // Set the end position.
1027            self.upd.write_u64(0, pos);
1028            self.upd.write_u64(8, size);
1029            self.upd.commit(pos);
1030        } else {
1031            for (start, v) in self.list.iter() {
1032                if *start < self.size {
1033                    // Writes beyond current stg size have already been written.
1034                    self.stg.write(*start, v.all());
1035                }
1036            }
1037            self.list = GVec::new();
1038            self.stg.commit(size);
1039            self.upd.commit(0);
1040        }
1041    }
1042}
1043
1044impl BasicStorage for BasicAtomicFile {
1045    fn commit(&mut self, size: u64) {
1046        if self.stop {
1047            return;
1048        }
1049        self.commit_phase(size, 1);
1050        self.commit_phase(size, 2);
1051        self.size = size;
1052    }
1053
1054    fn size(&self) -> u64 {
1055        self.size
1056    }
1057
1058    fn read(&self, start: u64, data: &mut [u8]) {
1059        self.map.read(start, data, &*self.stg.stg);
1060    }
1061
1062    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1063        self.map.write(start, data, off, len);
1064    }
1065
1066    fn write(&mut self, start: u64, data: &[u8]) {
1067        let len = data.len();
1068        let d = Arc::new(data.to_vec());
1069        self.write_data(start, d, 0, len);
1070    }
1071
1072    fn shutdown(&mut self) {
1073        self.stop = true;
1074    }
1075}
1076
1077/// Optimized implementation of [Storage] ( unix only ).
1078#[cfg(target_family = "unix")]
1079pub struct UnixFileStorage {
1080    size: Arc<Mutex<u64>>,
1081    f: fs::File,
1082}
1083#[cfg(target_family = "unix")]
1084impl UnixFileStorage {
1085    /// Construct from filename.
1086    pub fn new(filename: &str) -> Box<Self> {
1087        let mut f = OpenOptions::new()
1088            .read(true)
1089            .write(true)
1090            .create(true)
1091            .truncate(false)
1092            .open(filename)
1093            .unwrap();
1094        let size = f.seek(SeekFrom::End(0)).unwrap();
1095        let size = Arc::new(Mutex::new(size));
1096        Box::new(Self { size, f })
1097    }
1098}
1099
1100#[cfg(target_family = "unix")]
1101impl Storage for UnixFileStorage {
1102    fn clone(&self) -> Box<dyn Storage> {
1103        Box::new(Self {
1104            size: self.size.clone(),
1105            f: self.f.try_clone().unwrap(),
1106        })
1107    }
1108}
1109
1110#[cfg(target_family = "unix")]
1111use std::os::unix::fs::FileExt;
1112
1113#[cfg(target_family = "unix")]
1114impl BasicStorage for UnixFileStorage {
1115    fn read(&self, start: u64, data: &mut [u8]) {
1116        let _ = self.f.read_at(data, start);
1117    }
1118
1119    fn write(&mut self, start: u64, data: &[u8]) {
1120        let _ = self.f.write_at(data, start);
1121    }
1122
1123    fn size(&self) -> u64 {
1124        *self.size.lock().unwrap()
1125    }
1126
1127    fn commit(&mut self, size: u64) {
1128        *self.size.lock().unwrap() = size;
1129        self.f.set_len(size).unwrap();
1130        self.f.sync_all().unwrap();
1131    }
1132}
1133
1134/// Optimized implementation of [Storage] ( windows only ).
1135#[cfg(target_family = "windows")]
1136pub struct WindowsFileStorage {
1137    size: Arc<Mutex<u64>>,
1138    f: fs::File,
1139}
1140#[cfg(target_family = "windows")]
1141impl WindowsFileStorage {
1142    /// Construct from filename.
1143    pub fn new(filename: &str) -> Box<Self> {
1144        let mut f = OpenOptions::new()
1145            .read(true)
1146            .write(true)
1147            .create(true)
1148            .truncate(false)
1149            .open(filename)
1150            .unwrap();
1151        let size = f.seek(SeekFrom::End(0)).unwrap();
1152        let size = Arc::new(Mutex::new(size));
1153        Box::new(Self { size, f })
1154    }
1155}
1156
1157#[cfg(target_family = "windows")]
1158impl Storage for WindowsFileStorage {
1159    fn clone(&self) -> Box<dyn Storage> {
1160        Box::new(Self {
1161            size: self.size.clone(),
1162            f: self.f.try_clone().unwrap(),
1163        })
1164    }
1165}
1166
1167#[cfg(target_family = "windows")]
1168use std::os::windows::fs::FileExt;
1169
1170#[cfg(target_family = "windows")]
1171impl BasicStorage for WindowsFileStorage {
1172    fn read(&self, start: u64, data: &mut [u8]) {
1173        let _ = self.f.seek_read(data, start);
1174    }
1175
1176    fn write(&mut self, start: u64, data: &[u8]) {
1177        let _ = self.f.seek_write(data, start);
1178    }
1179
1180    fn size(&self) -> u64 {
1181        *self.size.lock().unwrap()
1182    }
1183
1184    fn commit(&mut self, size: u64) {
1185        *self.size.lock().unwrap() = size;
1186        self.f.set_len(size).unwrap();
1187        self.f.sync_all().unwrap();
1188    }
1189}
1190
1191/// Optimised Storage ( varies according to platform ).
1192#[cfg(target_family = "windows")]
1193pub type MultiFileStorage = WindowsFileStorage;
1194
1195/// Optimised Storage ( varies according to platform ).
1196#[cfg(target_family = "unix")]
1197pub type MultiFileStorage = UnixFileStorage;
1198
1199/// Optimised Storage ( varies according to platform ).
1200#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1201pub type MultiFileStorage = AnyFileStorage;
1202
1203/// Fast Storage for upd file ( varies according to platform ).
1204#[cfg(any(target_family = "windows", target_family = "unix"))]
1205pub type FastFileStorage = MultiFileStorage;
1206
1207/// Fast Storage for upd file ( varies according to platform ).
1208#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1209pub type FastFileStorage = UpdFileStorage;
1210
1211#[cfg(test)]
1212/// Get amount of testing from environment variable TA.
1213fn test_amount() -> usize {
1214    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1215}
1216
1217#[test]
1218fn test_atomic_file() {
1219    use rand::Rng;
1220    /* Idea of test is to check AtomicFile and MemFile behave the same */
1221
1222    let ta = test_amount();
1223    println!(" Test amount={}", ta);
1224
1225    let mut rng = rand::thread_rng();
1226
1227    for _ in 0..100 {
1228        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1229        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1230        let mut s2 = MemFile::new();
1231
1232        for _ in 0..1000 * ta {
1233            let off: usize = rng.r#gen::<usize>() % 100;
1234            let mut len = 1 + rng.r#gen::<usize>() % 20;
1235            let w: bool = rng.r#gen();
1236            if w {
1237                let mut bytes = Vec::new();
1238                while len > 0 {
1239                    len -= 1;
1240                    let b: u8 = rng.r#gen::<u8>();
1241                    bytes.push(b);
1242                }
1243                s1.write(off as u64, &bytes);
1244                s2.write(off as u64, &bytes);
1245            } else {
1246                let mut b2 = vec![0; len];
1247                let mut b3 = vec![0; len];
1248                s1.read(off as u64, &mut b2);
1249                s2.read(off as u64, &mut b3);
1250                assert!(b2 == b3);
1251            }
1252            if rng.r#gen::<usize>() % 50 == 0 {
1253                s1.commit(200);
1254                s2.commit(200);
1255            }
1256        }
1257    }
1258}