Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::{GTemp, Temp},
27    veca as tvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::vec as tvec;
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(not(feature = "pstd"))]
37use std::collections::BTreeMap;
38
39#[cfg(feature = "pstd")]
40type GVec<T> = VecA<T, GTemp>;
41
42#[cfg(not(feature = "pstd"))]
43type GVec<T> = Vec<T>;
44
45#[cfg(feature = "pstd")]
46type TVec<T> = VecA<T, Temp>;
47
48#[cfg(not(feature = "pstd"))]
49type TVec<T> = Vec<T>;
50
51/// ```Arc<Vec<u8>>```
52pub type Data = Arc<Vec<u8>>;
53
54/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
55/// Performs commit asyncronously.
56///
57/// #Example
58///
59/// ```
60/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
61/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
62/// af.write( 0, &[1,2,3,4] );
63/// af.commit(4);
64/// af.wait_complete();
65/// ```
66///
67/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
68/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
69/// the updates to underlying storage have been applied.
70pub struct AtomicFile {
71    /// New updates are written here.
72    map: WMap,
73    /// Underlying file, with previous updates mapped.
74    cf: Arc<RwLock<CommitFile>>,
75    /// File size.
76    size: u64,
77    /// For sending update maps to be saved.
78    tx: std::sync::mpsc::Sender<(u64, WMap)>,
79    /// Held by update process while it is active.
80    busy: Arc<Mutex<()>>,
81    /// Limit on size of CommitFile map.
82    map_lim: usize,
83}
84
85impl AtomicFile {
86    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
87    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
88        Self::new_with_limits(stg, upd, &Limits::default())
89    }
90
91    /// Construct Atomic file with specified limits.
92    pub fn new_with_limits(
93        stg: Box<dyn Storage>,
94        upd: Box<dyn BasicStorage>,
95        lim: &Limits,
96    ) -> Box<Self> {
97        let size = stg.size();
98        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
99
100        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
101        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
102        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
103
104        // Start the thread which does save asyncronously.
105        let (cf1, busy1) = (cf.clone(), busy.clone());
106
107        std::thread::spawn(move || {
108            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
109            while let Ok((size, map)) = rx.recv() {
110                let _lock = busy1.lock();
111                baf.map = map;
112                baf.commit(size);
113                cf1.write().unwrap().done_one();
114            }
115        });
116        Box::new(Self {
117            map: WMap::default(),
118            cf,
119            size,
120            tx,
121            busy,
122            map_lim: lim.map_lim,
123        })
124    }
125}
126
127impl Storage for AtomicFile {
128    fn clone(&self) -> Box<dyn Storage> {
129        panic!()
130    }
131}
132
133impl BasicStorage for AtomicFile {
134    fn commit(&mut self, size: u64) {
135        self.size = size;
136        if self.map.is_empty() {
137            return;
138        }
139        if self.cf.read().unwrap().map.len() > self.map_lim {
140            self.wait_complete();
141        }
142        let map = std::mem::take(&mut self.map);
143        let cf = &mut *self.cf.write().unwrap();
144        cf.todo += 1;
145        // Apply map of updates to CommitFile.
146        map.to_storage(cf);
147        // Send map of updates to thread to be written to underlying storage.
148        self.tx.send((size, map)).unwrap();
149    }
150
151    fn size(&self) -> u64 {
152        self.size
153    }
154
155    fn read(&self, start: u64, data: &mut [u8]) {
156        self.map.read(start, data, &*self.cf.read().unwrap());
157    }
158
159    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160        self.map.write(start, data, off, len);
161    }
162
163    fn write(&mut self, start: u64, data: &[u8]) {
164        let len = data.len();
165        let d = Arc::new(data.to_vec());
166        self.write_data(start, d, 0, len);
167    }
168
169    fn wait_complete(&self) {
170        while self.cf.read().unwrap().todo != 0 {
171            let _x = self.busy.lock();
172        }
173    }
174}
175
176struct CommitFile {
177    /// Buffered underlying storage.
178    stg: ReadBufStg<256>,
179    /// Map of committed updates.
180    map: WMap,
181    /// Number of outstanding unsaved commits.
182    todo: usize,
183}
184
185impl CommitFile {
186    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
187        Self {
188            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
189            map: WMap::default(),
190            todo: 0,
191        }
192    }
193
194    fn done_one(&mut self) {
195        self.todo -= 1;
196        if self.todo == 0 {
197            self.map = WMap::default();
198            self.stg.reset();
199        }
200    }
201}
202
203impl BasicStorage for CommitFile {
204    fn commit(&mut self, _size: u64) {
205        panic!()
206    }
207
208    fn size(&self) -> u64 {
209        panic!()
210    }
211
212    fn read(&self, start: u64, data: &mut [u8]) {
213        self.map.read(start, data, &self.stg);
214    }
215
216    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
217        self.map.write(start, data, off, len);
218    }
219
220    fn write(&mut self, _start: u64, _data: &[u8]) {
221        panic!()
222    }
223}
224
225/// Storage interface - BasicStorage is some kind of "file" storage.
226///
227/// read and write methods take a start which is a byte offset in the underlying file.
228pub trait BasicStorage: Send {
229    /// Get the size of the underlying storage.
230    /// Note : this is valid initially and after a commit but is not defined after write is called.
231    fn size(&self) -> u64;
232
233    /// Read data.
234    fn read(&self, start: u64, data: &mut [u8]);
235
236    /// Write byte slice to storage.
237    fn write(&mut self, start: u64, data: &[u8]);
238
239    /// Write byte Vec.
240    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
241        let len = data.len();
242        let d = Arc::new(data);
243        self.write_data(start, d, 0, len);
244    }
245
246    /// Write Data slice.
247    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
248        self.write(start, &data[off..off + len]);
249    }
250
251    /// Finish write transaction, size is new size of underlying storage.
252    fn commit(&mut self, size: u64);
253
254    /// Write u64.
255    fn write_u64(&mut self, start: u64, value: u64) {
256        self.write(start, &value.to_le_bytes());
257    }
258
259    /// Read u64.
260    fn read_u64(&self, start: u64) -> u64 {
261        let mut bytes = [0; 8];
262        self.read(start, &mut bytes);
263        u64::from_le_bytes(bytes)
264    }
265
266    /// Wait until current writes are complete.
267    fn wait_complete(&self) {}
268}
269
270/// BasicStorage with Sync and clone.
271pub trait Storage: BasicStorage + Sync {
272    /// Clone.
273    fn clone(&self) -> Box<dyn Storage>;
274}
275
276/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
277#[derive(Default)]
278pub struct MemFile {
279    v: Arc<Mutex<Vec<u8>>>,
280}
281
282impl MemFile {
283    /// Get a new (boxed) MemFile.
284    pub fn new() -> Box<Self> {
285        Box::default()
286    }
287}
288
289impl Storage for MemFile {
290    fn clone(&self) -> Box<dyn Storage> {
291        Box::new(Self { v: self.v.clone() })
292    }
293}
294
295impl BasicStorage for MemFile {
296    fn size(&self) -> u64 {
297        let v = self.v.lock().unwrap();
298        v.len() as u64
299    }
300
301    fn read(&self, off: u64, bytes: &mut [u8]) {
302        let off = off as usize;
303        let len = bytes.len();
304        let mut v = self.v.lock().unwrap();
305        if off + len > v.len() {
306            v.resize(off + len, 0);
307        }
308        bytes.copy_from_slice(&v[off..off + len]);
309    }
310
311    fn write(&mut self, off: u64, bytes: &[u8]) {
312        let off = off as usize;
313        let len = bytes.len();
314        let mut v = self.v.lock().unwrap();
315        if off + len > v.len() {
316            v.resize(off + len, 0);
317        }
318        v[off..off + len].copy_from_slice(bytes);
319    }
320
321    fn commit(&mut self, size: u64) {
322        let mut v = self.v.lock().unwrap();
323        v.resize(size as usize, 0);
324    }
325}
326
327use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
328
329struct FileInner {
330    f: fs::File,
331}
332
333impl FileInner {
334    /// Construct from filename.
335    pub fn new(filename: &str) -> Self {
336        Self {
337            f: OpenOptions::new()
338                .read(true)
339                .write(true)
340                .create(true)
341                .truncate(false)
342                .open(filename)
343                .unwrap(),
344        }
345    }
346
347    fn size(&mut self) -> u64 {
348        self.f.seek(SeekFrom::End(0)).unwrap()
349    }
350
351    fn read(&mut self, off: u64, bytes: &mut [u8]) {
352        self.f.seek(SeekFrom::Start(off)).unwrap();
353        let _ = self.f.read(bytes).unwrap();
354    }
355
356    fn write(&mut self, off: u64, bytes: &[u8]) {
357        // The list of operating systems which auto-zero is likely more than this...research is todo.
358        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
359        {
360            let size = self.f.seek(SeekFrom::End(0)).unwrap();
361            if off > size {
362                self.f.set_len(off).unwrap();
363            }
364        }
365        self.f.seek(SeekFrom::Start(off)).unwrap();
366        let _ = self.f.write(bytes).unwrap();
367    }
368
369    fn commit(&mut self, size: u64) {
370        self.f.set_len(size).unwrap();
371        self.f.sync_all().unwrap();
372    }
373}
374
375/// For atomic upd file, if not unix or windows.
376pub struct UpdFileStorage {
377    file: Cell<Option<FileInner>>,
378}
379
380impl UpdFileStorage {
381    /// Construct from filename.
382    pub fn new(filename: &str) -> Box<Self> {
383        Box::new(Self {
384            file: Cell::new(Some(FileInner::new(filename))),
385        })
386    }
387}
388
389impl BasicStorage for UpdFileStorage {
390    fn size(&self) -> u64 {
391        let mut f = self.file.take().unwrap();
392        let result = f.size();
393        self.file.set(Some(f));
394        result
395    }
396    fn read(&self, off: u64, bytes: &mut [u8]) {
397        let mut f = self.file.take().unwrap();
398        f.read(off, bytes);
399        self.file.set(Some(f));
400    }
401
402    fn write(&mut self, off: u64, bytes: &[u8]) {
403        let mut f = self.file.take().unwrap();
404        f.write(off, bytes);
405        self.file.set(Some(f));
406    }
407
408    fn commit(&mut self, size: u64) {
409        let mut f = self.file.take().unwrap();
410        f.commit(size);
411        self.file.set(Some(f));
412    }
413}
414
415/// Simple implementation of [Storage] using [`std::fs::File`].
416pub struct SimpleFileStorage {
417    file: Arc<Mutex<FileInner>>,
418}
419
420impl SimpleFileStorage {
421    /// Construct from filename.
422    pub fn new(filename: &str) -> Box<Self> {
423        Box::new(Self {
424            file: Arc::new(Mutex::new(FileInner::new(filename))),
425        })
426    }
427}
428
429impl Storage for SimpleFileStorage {
430    fn clone(&self) -> Box<dyn Storage> {
431        Box::new(Self {
432            file: self.file.clone(),
433        })
434    }
435}
436
437impl BasicStorage for SimpleFileStorage {
438    fn size(&self) -> u64 {
439        self.file.lock().unwrap().size()
440    }
441
442    fn read(&self, off: u64, bytes: &mut [u8]) {
443        self.file.lock().unwrap().read(off, bytes);
444    }
445
446    fn write(&mut self, off: u64, bytes: &[u8]) {
447        self.file.lock().unwrap().write(off, bytes);
448    }
449
450    fn commit(&mut self, size: u64) {
451        self.file.lock().unwrap().commit(size);
452    }
453}
454
455/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
456pub struct AnyFileStorage {
457    filename: String,
458    files: Arc<Mutex<Vec<FileInner>>>,
459}
460
461impl AnyFileStorage {
462    /// Create new.
463    pub fn new(filename: &str) -> Box<Self> {
464        Box::new(Self {
465            filename: filename.to_owned(),
466            files: Arc::new(Mutex::new(Vec::new())),
467        })
468    }
469
470    fn get_file(&self) -> FileInner {
471        match self.files.lock().unwrap().pop() {
472            Some(f) => f,
473            _ => FileInner::new(&self.filename),
474        }
475    }
476
477    fn put_file(&self, f: FileInner) {
478        self.files.lock().unwrap().push(f);
479    }
480}
481
482impl Storage for AnyFileStorage {
483    fn clone(&self) -> Box<dyn Storage> {
484        Box::new(Self {
485            filename: self.filename.clone(),
486            files: self.files.clone(),
487        })
488    }
489}
490
491impl BasicStorage for AnyFileStorage {
492    fn size(&self) -> u64 {
493        let mut f = self.get_file();
494        let result = f.size();
495        self.put_file(f);
496        result
497    }
498
499    fn read(&self, off: u64, bytes: &mut [u8]) {
500        let mut f = self.get_file();
501        f.read(off, bytes);
502        self.put_file(f);
503    }
504
505    fn write(&mut self, off: u64, bytes: &[u8]) {
506        let mut f = self.get_file();
507        f.write(off, bytes);
508        self.put_file(f);
509    }
510
511    fn commit(&mut self, size: u64) {
512        let mut f = self.get_file();
513        f.commit(size);
514        self.put_file(f);
515    }
516}
517
518/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
519pub struct DummyFile {}
520impl DummyFile {
521    /// Construct.
522    pub fn new() -> Box<Self> {
523        Box::new(Self {})
524    }
525}
526
527impl Storage for DummyFile {
528    fn clone(&self) -> Box<dyn Storage> {
529        Self::new()
530    }
531}
532
533impl BasicStorage for DummyFile {
534    fn size(&self) -> u64 {
535        0
536    }
537
538    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
539
540    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
541
542    fn commit(&mut self, _size: u64) {}
543}
544
545/// Memory configuration limits for [`AtomicFile`].
546#[non_exhaustive]
547pub struct Limits {
548    /// Limit on size of commit write map, default is 5000.
549    pub map_lim: usize,
550    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
551    pub rbuf_mem: usize,
552    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
553    pub swbuf: usize,
554    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
555    pub uwbuf: usize,
556}
557
558impl Default for Limits {
559    fn default() -> Self {
560        Self {
561            map_lim: 5000,
562            rbuf_mem: 0x200000,
563            swbuf: 0x100000,
564            uwbuf: 0x100000,
565        }
566    }
567}
568
569/// Write Buffer.
570struct WriteBuffer {
571    /// Current write index into buf.
572    ix: usize,
573    /// Current file position.
574    pos: u64,
575    /// Underlying storage.
576    pub stg: Box<dyn BasicStorage>,
577    /// Buffer.
578    buf: Vec<u8>,
579}
580
581impl WriteBuffer {
582    /// Construct.
583    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
584        Self {
585            ix: 0,
586            pos: u64::MAX,
587            stg,
588            buf: vec![0; buf_size],
589        }
590    }
591
592    /// Write data to specified offset,
593    pub fn write(&mut self, off: u64, data: &[u8]) {
594        if self.pos + self.ix as u64 != off {
595            self.flush(off);
596        }
597        let mut done: usize = 0;
598        let mut todo: usize = data.len();
599        while todo > 0 {
600            let mut n: usize = self.buf.len() - self.ix;
601            if n == 0 {
602                self.flush(off + done as u64);
603                n = self.buf.len();
604            }
605            if n > todo {
606                n = todo;
607            }
608            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
609            todo -= n;
610            done += n;
611            self.ix += n;
612        }
613    }
614
615    fn flush(&mut self, new_pos: u64) {
616        if self.ix > 0 {
617            self.stg.write(self.pos, &self.buf[0..self.ix]);
618        }
619        self.ix = 0;
620        self.pos = new_pos;
621    }
622
623    /// Commit.
624    pub fn commit(&mut self, size: u64) {
625        self.flush(u64::MAX);
626        self.stg.commit(size);
627    }
628
629    /// Write u64.
630    pub fn write_u64(&mut self, start: u64, value: u64) {
631        self.write(start, &value.to_le_bytes());
632    }
633}
634
635/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
636///
637/// See implementation of AtomicFile for how this is used in conjunction with WMap.
638///
639/// N is buffer size.
640struct ReadBufStg<const N: usize> {
641    /// Underlying storage.
642    stg: Box<dyn Storage>,
643    /// Buffers.
644    buf: Mutex<ReadBuffer<N>>,
645    /// Read size that is considered small.
646    limit: usize,
647}
648
649impl<const N: usize> Drop for ReadBufStg<N> {
650    fn drop(&mut self) {
651        self.reset();
652    }
653}
654
655impl<const N: usize> ReadBufStg<N> {
656    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
657    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
658        Self {
659            stg,
660            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
661            limit,
662        }
663    }
664
665    /// Clears the buffers.
666    fn reset(&mut self) {
667        self.buf.lock().unwrap().reset();
668    }
669}
670
671impl<const N: usize> BasicStorage for ReadBufStg<N> {
672    /// Read data from storage.
673    fn read(&self, start: u64, data: &mut [u8]) {
674        if data.len() <= self.limit {
675            self.buf.lock().unwrap().read(&*self.stg, start, data);
676        } else {
677            self.stg.read(start, data);
678        }
679    }
680
681    /// Panics.
682    fn size(&self) -> u64 {
683        panic!()
684    }
685
686    /// Panics.
687    fn write(&mut self, _start: u64, _data: &[u8]) {
688        panic!();
689    }
690
691    /// Panics.
692    fn commit(&mut self, _size: u64) {
693        panic!();
694    }
695}
696
697struct ReadBuffer<const N: usize> {
698    /// Maps sector mumbers cached buffers.
699    map: HashMap<u64, Box<[u8; N]>>,
700    /// Maximum number of buffers.
701    max_buf: usize,
702}
703
704impl<const N: usize> ReadBuffer<N> {
705    fn new(max_buf: usize) -> Self {
706        Self {
707            map: HashMap::default(),
708            max_buf,
709        }
710    }
711
712    fn reset(&mut self) {
713        self.map.clear();
714    }
715
716    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
717        let mut done = 0;
718        while done < data.len() {
719            let off = off + done as u64;
720            let sector = off / N as u64;
721            let disp = (off % N as u64) as usize;
722            let amount = min(data.len() - done, N - disp);
723
724            let p = self.map.entry(sector).or_insert_with(|| {
725                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
726                stg.read(sector * N as u64, &mut *p);
727                p
728            });
729            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
730            done += amount;
731        }
732        if self.map.len() >= self.max_buf {
733            self.reset();
734        }
735    }
736}
737
738#[derive(Default)]
739/// Slice of Data to be written to storage.
740struct DataSlice {
741    /// Slice data.
742    pub data: Data,
743    /// Start of slice.
744    pub off: usize,
745    /// Length of slice.
746    pub len: usize,
747}
748
749impl DataSlice {
750    /// Get reference to the whole slice.
751    pub fn all(&self) -> &[u8] {
752        &self.data[self.off..self.off + self.len]
753    }
754    /// Get reference to part of slice.
755    pub fn part(&self, off: usize, len: usize) -> &[u8] {
756        &self.data[self.off + off..self.off + off + len]
757    }
758    /// Trim specified amount from start of slice.
759    pub fn trim(&mut self, trim: usize) {
760        self.off += trim;
761        self.len -= trim;
762    }
763    /// Take the data.
764    #[allow(dead_code)]
765    pub fn take(&mut self) -> Data {
766        std::mem::take(&mut self.data)
767    }
768}
769
770#[derive(Default)]
771/// Updateable store based on some underlying storage.
772struct WMap {
773    /// Map of writes. Key is the end of the slice.
774    map: BTreeMap<u64, DataSlice>,
775}
776
777impl WMap {
778    /// Is the map empty?
779    pub fn is_empty(&self) -> bool {
780        self.map.is_empty()
781    }
782
783    /// Number of key-value pairs in the map.
784    pub fn len(&self) -> usize {
785        self.map.len()
786    }
787
788    /// Take the map and convert it to a Vec.
789    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
790        let map = std::mem::take(&mut self.map);
791        let mut result = GVec::with_capacity(map.len());
792        for (end, v) in map {
793            let start = end - v.len as u64;
794            result.push((start, v));
795        }
796        result
797    }
798
799    /// Write the map into storage.
800    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
801        for (end, v) in self.map.iter() {
802            let start = end - v.len as u64;
803            stg.write_data(start, v.data.clone(), v.off, v.len);
804        }
805    }
806
807    #[cfg(not(feature = "pstd"))]
808    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
809    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
810        if len != 0 {
811            let (mut insert, mut remove) = (Vec::new(), Vec::new());
812            let end = start + len as u64;
813            for (ee, v) in self.map.range_mut(start + 1..) {
814                let ee = *ee;
815                let es = ee - v.len as u64; // Existing write Start.
816                if es >= end {
817                    // Existing write starts after end of new write, nothing to do.
818                    break;
819                } else if start <= es {
820                    if end < ee {
821                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
822                        v.trim((end - es) as usize);
823                        break;
824                    }
825                    // New write subsumes existing write entirely, remove existing write.
826                    remove.push(ee);
827                } else if end < ee {
828                    // New write starts in middle of existing write, ends before end of existing write,
829                    // put start of existing write in insert list, trim existing write.
830                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
831                    v.trim((end - es) as usize);
832                    break;
833                } else {
834                    // New write starts in middle of existing write, ends after existing write,
835                    // put start of existing write in insert list, remove existing write.
836                    insert.push((es, v.take(), v.off, (start - es) as usize));
837                    remove.push(ee);
838                }
839            }
840            for end in remove {
841                self.map.remove(&end);
842            }
843            for (start, data, off, len) in insert {
844                self.map
845                    .insert(start + len as u64, DataSlice { data, off, len });
846            }
847            self.map
848                .insert(start + len as u64, DataSlice { data, off, len });
849        }
850    }
851
852    #[cfg(feature = "pstd")]
853    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
854    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
855        if len != 0 {
856            let end = start + len as u64;
857            let mut c = self
858                .map
859                .lower_bound_mut(std::ops::Bound::Excluded(&start))
860                .with_mutable_key();
861            while let Some((eend, v)) = c.next() {
862                let ee = *eend;
863                let es = ee - v.len as u64; // Existing write Start.
864                if es >= end {
865                    // Existing write starts after end of new write, nothing to do.
866                    c.prev();
867                    break;
868                } else if start <= es {
869                    if end < ee {
870                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
871                        v.trim((end - es) as usize);
872                        c.prev();
873                        break;
874                    }
875                    // New write subsumes existing write entirely, remove existing write.
876                    c.remove_prev();
877                } else if end < ee {
878                    // New write starts in middle of existing write, ends before end of existing write,
879                    // trim existing write, insert start of existing write.
880                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
881                    v.trim((end - es) as usize);
882                    c.prev();
883                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
884                    break;
885                } else {
886                    // New write starts in middle of existing write, ends after existing write,
887                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
888                    v.len = (start - es) as usize;
889                    *eend = es + v.len as u64;
890                }
891            }
892            // Insert the new write.
893            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
894        }
895    }
896
897    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
898    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
899        let len = data.len();
900        if len != 0 {
901            let mut done = 0;
902            for (&end, v) in self.map.range(start + 1..) {
903                let es = end - v.len as u64; // Existing write Start.
904                let doff = start + done as u64;
905                if es > doff {
906                    // Read from underlying storage.
907                    let a = min(len - done, (es - doff) as usize);
908                    u.read(doff, &mut data[done..done + a]);
909                    done += a;
910                    if done == len {
911                        return;
912                    }
913                }
914                // Use existing write.
915                let skip = (start + done as u64 - es) as usize;
916                let a = min(len - done, v.len - skip);
917                data[done..done + a].copy_from_slice(v.part(skip, a));
918                done += a;
919                if done == len {
920                    return;
921                }
922            }
923            u.read(start + done as u64, &mut data[done..]);
924        }
925    }
926}
927
928/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
929pub struct BasicAtomicFile {
930    /// The main underlying storage.
931    stg: WriteBuffer,
932    /// Temporary storage for updates during commit.
933    upd: WriteBuffer,
934    /// Map of writes.
935    map: WMap,
936    /// List of writes.
937    list: GVec<(u64, DataSlice)>,
938    size: u64,
939}
940
941impl BasicAtomicFile {
942    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
943    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
944        let size = stg.size();
945        let mut result = Box::new(Self {
946            stg: WriteBuffer::new(stg, lim.swbuf),
947            upd: WriteBuffer::new(upd, lim.uwbuf),
948            map: WMap::default(),
949            list: GVec::new(),
950            size,
951        });
952        result.init();
953        result
954    }
955
956    /// Apply outstanding updates.
957    fn init(&mut self) {
958        let end = self.upd.stg.read_u64(0);
959        let size = self.upd.stg.read_u64(8);
960        if end == 0 {
961            return;
962        }
963        assert!(end == self.upd.stg.size());
964        let mut pos = 16;
965        while pos < end {
966            let start = self.upd.stg.read_u64(pos);
967            pos += 8;
968            let len = self.upd.stg.read_u64(pos);
969            pos += 8;
970            let mut buf: TVec<u8> = tvec![0; len as usize];
971            self.upd.stg.read(pos, &mut buf);
972            pos += len;
973            self.stg.write(start, &buf);
974        }
975        self.stg.commit(size);
976        self.upd.commit(0);
977    }
978
979    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
980    pub fn commit_phase(&mut self, size: u64, phase: u8) {
981        if self.map.is_empty() && self.list.is_empty() {
982            return;
983        }
984        if phase == 1 {
985            self.list = self.map.convert_to_vec();
986
987            // Write the updates to upd.
988            // First set the end position to zero.
989            self.upd.write_u64(0, 0);
990            self.upd.write_u64(8, size);
991            self.upd.commit(16); // Not clear if this is necessary.
992
993            // Write the update records.
994            let mut stg_written = false;
995            let mut pos: u64 = 16;
996            for (start, v) in self.list.iter() {
997                let (start, len, data) = (*start, v.len as u64, v.all());
998                if start >= self.size {
999                    // Writes beyond current stg size can be written directly.
1000                    stg_written = true;
1001                    self.stg.write(start, data);
1002                } else {
1003                    self.upd.write_u64(pos, start);
1004                    pos += 8;
1005                    self.upd.write_u64(pos, len);
1006                    pos += 8;
1007                    self.upd.write(pos, data);
1008                    pos += len;
1009                }
1010            }
1011            if stg_written {
1012                self.stg.commit(size);
1013            }
1014            self.upd.commit(pos); // Not clear if this is necessary.
1015
1016            // Set the end position.
1017            self.upd.write_u64(0, pos);
1018            self.upd.write_u64(8, size);
1019            self.upd.commit(pos);
1020        } else {
1021            for (start, v) in self.list.iter() {
1022                if *start < self.size {
1023                    // Writes beyond current stg size have already been written.
1024                    self.stg.write(*start, v.all());
1025                }
1026            }
1027            self.list = GVec::new();
1028            self.stg.commit(size);
1029            self.upd.commit(0);
1030        }
1031    }
1032}
1033
1034impl BasicStorage for BasicAtomicFile {
1035    fn commit(&mut self, size: u64) {
1036        self.commit_phase(size, 1);
1037        self.commit_phase(size, 2);
1038        self.size = size;
1039    }
1040
1041    fn size(&self) -> u64 {
1042        self.size
1043    }
1044
1045    fn read(&self, start: u64, data: &mut [u8]) {
1046        self.map.read(start, data, &*self.stg.stg);
1047    }
1048
1049    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1050        self.map.write(start, data, off, len);
1051    }
1052
1053    fn write(&mut self, start: u64, data: &[u8]) {
1054        let len = data.len();
1055        let d = Arc::new(data.to_vec());
1056        self.write_data(start, d, 0, len);
1057    }
1058}
1059
1060/// Optimized implementation of [Storage] ( unix only ).
1061#[cfg(target_family = "unix")]
1062pub struct UnixFileStorage {
1063    size: Arc<Mutex<u64>>,
1064    f: fs::File,
1065}
1066#[cfg(target_family = "unix")]
1067impl UnixFileStorage {
1068    /// Construct from filename.
1069    pub fn new(filename: &str) -> Box<Self> {
1070        let mut f = OpenOptions::new()
1071            .read(true)
1072            .write(true)
1073            .create(true)
1074            .truncate(false)
1075            .open(filename)
1076            .unwrap();
1077        let size = f.seek(SeekFrom::End(0)).unwrap();
1078        let size = Arc::new(Mutex::new(size));
1079        Box::new(Self { size, f })
1080    }
1081}
1082
1083#[cfg(target_family = "unix")]
1084impl Storage for UnixFileStorage {
1085    fn clone(&self) -> Box<dyn Storage> {
1086        Box::new(Self {
1087            size: self.size.clone(),
1088            f: self.f.try_clone().unwrap(),
1089        })
1090    }
1091}
1092
1093#[cfg(target_family = "unix")]
1094use std::os::unix::fs::FileExt;
1095
1096#[cfg(target_family = "unix")]
1097impl BasicStorage for UnixFileStorage {
1098    fn read(&self, start: u64, data: &mut [u8]) {
1099        let _ = self.f.read_at(data, start);
1100    }
1101
1102    fn write(&mut self, start: u64, data: &[u8]) {
1103        let _ = self.f.write_at(data, start);
1104    }
1105
1106    fn size(&self) -> u64 {
1107        *self.size.lock().unwrap()
1108    }
1109
1110    fn commit(&mut self, size: u64) {
1111        *self.size.lock().unwrap() = size;
1112        self.f.set_len(size).unwrap();
1113        self.f.sync_all().unwrap();
1114    }
1115}
1116
1117/// Optimized implementation of [Storage] ( windows only ).
1118#[cfg(target_family = "windows")]
1119pub struct WindowsFileStorage {
1120    size: Arc<Mutex<u64>>,
1121    f: fs::File,
1122}
1123#[cfg(target_family = "windows")]
1124impl WindowsFileStorage {
1125    /// Construct from filename.
1126    pub fn new(filename: &str) -> Box<Self> {
1127        let mut f = OpenOptions::new()
1128            .read(true)
1129            .write(true)
1130            .create(true)
1131            .truncate(false)
1132            .open(filename)
1133            .unwrap();
1134        let size = f.seek(SeekFrom::End(0)).unwrap();
1135        let size = Arc::new(Mutex::new(size));
1136        Box::new(Self { size, f })
1137    }
1138}
1139
1140#[cfg(target_family = "windows")]
1141impl Storage for WindowsFileStorage {
1142    fn clone(&self) -> Box<dyn Storage> {
1143        Box::new(Self {
1144            size: self.size.clone(),
1145            f: self.f.try_clone().unwrap(),
1146        })
1147    }
1148}
1149
1150#[cfg(target_family = "windows")]
1151use std::os::windows::fs::FileExt;
1152
1153#[cfg(target_family = "windows")]
1154impl BasicStorage for WindowsFileStorage {
1155    fn read(&self, start: u64, data: &mut [u8]) {
1156        let _ = self.f.seek_read(data, start);
1157    }
1158
1159    fn write(&mut self, start: u64, data: &[u8]) {
1160        let _ = self.f.seek_write(data, start);
1161    }
1162
1163    fn size(&self) -> u64 {
1164        *self.size.lock().unwrap()
1165    }
1166
1167    fn commit(&mut self, size: u64) {
1168        *self.size.lock().unwrap() = size;
1169        self.f.set_len(size).unwrap();
1170        self.f.sync_all().unwrap();
1171    }
1172}
1173
1174/// Optimised Storage ( varies according to platform ).
1175#[cfg(target_family = "windows")]
1176pub type MultiFileStorage = WindowsFileStorage;
1177
1178/// Optimised Storage ( varies according to platform ).
1179#[cfg(target_family = "unix")]
1180pub type MultiFileStorage = UnixFileStorage;
1181
1182/// Optimised Storage ( varies according to platform ).
1183#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1184pub type MultiFileStorage = AnyFileStorage;
1185
1186/// Fast Storage for upd file ( varies according to platform ).
1187#[cfg(any(target_family = "windows", target_family = "unix"))]
1188pub type FastFileStorage = MultiFileStorage;
1189
1190/// Fast Storage for upd file ( varies according to platform ).
1191#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1192pub type FastFileStorage = UpdFileStorage;
1193
1194#[cfg(test)]
1195/// Get amount of testing from environment variable TA.
1196fn test_amount() -> usize {
1197    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1198}
1199
1200#[test]
1201fn test_atomic_file() {
1202    use rand::Rng;
1203    /* Idea of test is to check AtomicFile and MemFile behave the same */
1204
1205    let ta = test_amount();
1206    println!(" Test amount={}", ta);
1207
1208    let mut rng = rand::thread_rng();
1209
1210    for _ in 0..100 {
1211        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1212        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1213        let mut s2 = MemFile::new();
1214
1215        for _ in 0..1000 * ta {
1216            let off: usize = rng.r#gen::<usize>() % 100;
1217            let mut len = 1 + rng.r#gen::<usize>() % 20;
1218            let w: bool = rng.r#gen();
1219            if w {
1220                let mut bytes = Vec::new();
1221                while len > 0 {
1222                    len -= 1;
1223                    let b: u8 = rng.r#gen::<u8>();
1224                    bytes.push(b);
1225                }
1226                s1.write(off as u64, &bytes);
1227                s2.write(off as u64, &bytes);
1228            } else {
1229                let mut b2 = vec![0; len];
1230                let mut b3 = vec![0; len];
1231                s1.read(off as u64, &mut b2);
1232                s2.read(off as u64, &mut b3);
1233                assert!(b2 == b3);
1234            }
1235            if rng.r#gen::<usize>() % 50 == 0 {
1236                s1.commit(200);
1237                s2.commit(200);
1238            }
1239        }
1240    }
1241}