Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::GTemp,
27    veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39/// ```Arc<Vec<u8>>```
40pub type Data = Arc<Vec<u8>>;
41
42/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
43/// Performs commit asyncronously.
44///
45/// #Example
46///
47/// ```
48/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
49/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
50/// af.write( 0, &[1,2,3,4] );
51/// af.commit(4);
52/// af.wait_complete();
53/// ```
54///
55/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
56/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
57/// the updates to underlying storage have been applied.
58pub struct AtomicFile {
59    /// New updates are written here.
60    map: WMap,
61    /// Underlying file, with previous updates mapped.
62    cf: Arc<RwLock<CommitFile>>,
63    /// File size.
64    size: u64,
65    /// For sending update maps to be saved.
66    tx: std::sync::mpsc::Sender<(u64, WMap)>,
67    /// Held by update process while it is active. Value is set to true on program termination.
68    busy: Arc<Mutex<()>>,
69    /// Limit on size of CommitFile map.
70    map_lim: usize,
71}
72
73impl AtomicFile {
74    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
75    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76        Self::new_with_limits(stg, upd, &Limits::default())
77    }
78
79    /// Construct Atomic file with specified limits.
80    pub fn new_with_limits(
81        stg: Box<dyn Storage>,
82        upd: Box<dyn BasicStorage>,
83        lim: &Limits,
84    ) -> Box<Self> {
85        let size = stg.size();
86        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
91
92        // Start the thread which does save asyncronously.
93        let (cf1, busy1) = (cf.clone(), busy.clone());
94
95        std::thread::spawn(move || {
96            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
97            while let Ok((size, map)) = rx.recv() {
98                let _lock = busy1.lock();
99                baf.map = map;
100                baf.commit(size);
101                cf1.write().unwrap().done_one();
102            }
103        });
104        Box::new(Self {
105            map: WMap::default(),
106            cf,
107            size,
108            tx,
109            busy,
110            map_lim: lim.map_lim,
111        })
112    }
113}
114
115impl Storage for AtomicFile {
116    fn clone(&self) -> Box<dyn Storage> {
117        panic!()
118    }
119}
120
121impl BasicStorage for AtomicFile {
122    fn commit(&mut self, size: u64) {
123        self.size = size;
124        if self.map.is_empty() {
125            return;
126        }
127        if self.cf.read().unwrap().map.len() > self.map_lim {
128            self.wait_complete();
129        }
130        let map = std::mem::take(&mut self.map);
131        let cf = &mut *self.cf.write().unwrap();
132        if cf.stop { return; } // Do not start new commit on program termination.
133        cf.todo += 1;
134        // Apply map of updates to CommitFile.
135        map.to_storage(cf);
136        // Send map of updates to thread to be written to underlying storage.
137        self.tx.send((size, map)).unwrap();
138    }
139
140    fn size(&self) -> u64 {
141        self.size
142    }
143
144    fn read(&self, start: u64, data: &mut [u8]) {
145        self.map.read(start, data, &*self.cf.read().unwrap());
146    }
147
148    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
149        self.map.write(start, data, off, len);
150    }
151
152    fn write(&mut self, start: u64, data: &[u8]) {
153        let len = data.len();
154        let d = Arc::new(data.to_vec());
155        self.write_data(start, d, 0, len);
156    }
157
158    fn wait_complete(&self) {
159       while self.cf.read().unwrap().todo != 0 {
160           let _x = self.busy.lock();
161       }
162    }   
163
164    fn shutdown(&mut self) {
165        self.cf.write().unwrap().stop = true; // Prevents new commits from being added.
166        self.wait_complete(); // Wait for existing commmits to complete.
167    }       
168}
169
170struct CommitFile {
171    /// Buffered underlying storage.
172    stg: ReadBufStg<256>,
173    /// Map of committed updates.
174    map: WMap,
175    /// Number of outstanding unsaved commits.
176    todo: usize,
177    /// Flag to prevent new commits starting
178    stop: bool,
179}
180
181impl CommitFile {
182    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
183        Self {
184            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
185            map: WMap::default(),
186            todo: 0,
187            stop: false,
188        }
189    }
190
191    fn done_one(&mut self) {
192        self.todo -= 1;
193        if self.todo == 0 {
194            self.map = WMap::default();
195            self.stg.reset();
196        }
197    }
198}
199
200impl BasicStorage for CommitFile {
201    fn commit(&mut self, _size: u64) {
202        panic!()
203    }
204
205    fn size(&self) -> u64 {
206        panic!()
207    }
208
209    fn read(&self, start: u64, data: &mut [u8]) {
210        self.map.read(start, data, &self.stg);
211    }
212
213    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
214        self.map.write(start, data, off, len);
215    }
216
217    fn write(&mut self, _start: u64, _data: &[u8]) {
218        panic!()
219    }
220}
221
222/// Storage interface - BasicStorage is some kind of "file" storage.
223///
224/// read and write methods take a start which is a byte offset in the underlying file.
225pub trait BasicStorage: Send {
226    /// Get the size of the underlying storage.
227    /// Note : this is valid initially and after a commit but is not defined after write is called.
228    fn size(&self) -> u64;
229
230    /// Read data.
231    fn read(&self, start: u64, data: &mut [u8]);
232
233    /// Write byte slice to storage.
234    fn write(&mut self, start: u64, data: &[u8]);
235
236    /// Write byte Vec.
237    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
238        let len = data.len();
239        let d = Arc::new(data);
240        self.write_data(start, d, 0, len);
241    }
242
243    /// Write Data slice.
244    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
245        self.write(start, &data[off..off + len]);
246    }
247
248    /// Finish write transaction, size is new size of underlying storage.
249    fn commit(&mut self, size: u64);
250
251    /// Write u64.
252    fn write_u64(&mut self, start: u64, value: u64) {
253        self.write(start, &value.to_le_bytes());
254    }
255
256    /// Read u64.
257    fn read_u64(&self, start: u64) -> u64 {
258        let mut bytes = [0; 8];
259        self.read(start, &mut bytes);
260        u64::from_le_bytes(bytes)
261    }
262
263    /// Wait until current writes are complete.
264    fn wait_complete(&self){}
265
266    /// Called on program termination.
267    fn shutdown(&mut self){}
268}
269
270/// BasicStorage with Sync and clone.
271pub trait Storage: BasicStorage + Sync {
272    /// Clone.
273    fn clone(&self) -> Box<dyn Storage>;
274}
275
276/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
277#[derive(Default)]
278pub struct MemFile {
279    v: Arc<Mutex<Vec<u8>>>,
280}
281
282impl MemFile {
283    /// Get a new (boxed) MemFile.
284    pub fn new() -> Box<Self> {
285        Box::default()
286    }
287}
288
289impl Storage for MemFile {
290    fn clone(&self) -> Box<dyn Storage> {
291        Box::new(Self { v: self.v.clone() })
292    }
293}
294
295impl BasicStorage for MemFile {
296    fn size(&self) -> u64 {
297        let v = self.v.lock().unwrap();
298        v.len() as u64
299    }
300
301    fn read(&self, off: u64, bytes: &mut [u8]) {
302        let off = off as usize;
303        let len = bytes.len();
304        let mut v = self.v.lock().unwrap();
305        if off + len > v.len() {
306            v.resize(off + len, 0);
307        }
308        bytes.copy_from_slice(&v[off..off + len]);
309    }
310
311    fn write(&mut self, off: u64, bytes: &[u8]) {
312        let off = off as usize;
313        let len = bytes.len();
314        let mut v = self.v.lock().unwrap();
315        if off + len > v.len() {
316            v.resize(off + len, 0);
317        }
318        v[off..off + len].copy_from_slice(bytes);
319    }
320
321    fn commit(&mut self, size: u64) {
322        let mut v = self.v.lock().unwrap();
323        v.resize(size as usize, 0);
324    }
325}
326
327use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
328
329struct FileInner {
330    f: fs::File,
331}
332
333impl FileInner {
334    /// Construct from filename.
335    pub fn new(filename: &str) -> Self {
336        Self {
337            f: OpenOptions::new()
338                .read(true)
339                .write(true)
340                .create(true)
341                .truncate(false)
342                .open(filename)
343                .unwrap(),
344        }
345    }
346
347    fn size(&mut self) -> u64 {
348        self.f.seek(SeekFrom::End(0)).unwrap()
349    }
350
351    fn read(&mut self, off: u64, bytes: &mut [u8]) {
352        self.f.seek(SeekFrom::Start(off)).unwrap();
353        let _ = self.f.read(bytes).unwrap();
354    }
355
356    fn write(&mut self, off: u64, bytes: &[u8]) {
357        // The list of operating systems which auto-zero is likely more than this...research is todo.
358        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
359        {
360            let size = self.f.seek(SeekFrom::End(0)).unwrap();
361            if off > size {
362                self.f.set_len(off).unwrap();
363            }
364        }
365        self.f.seek(SeekFrom::Start(off)).unwrap();
366        let _ = self.f.write(bytes).unwrap();
367    }
368
369    fn commit(&mut self, size: u64) {
370        self.f.set_len(size).unwrap();
371        self.f.sync_all().unwrap();
372    }
373}
374
375/// For atomic upd file, if not unix or windows.
376pub struct UpdFileStorage {
377    file: Cell<Option<FileInner>>,
378}
379
380impl UpdFileStorage {
381    /// Construct from filename.
382    pub fn new(filename: &str) -> Box<Self> {
383        Box::new(Self {
384            file: Cell::new(Some(FileInner::new(filename))),
385        })
386    }
387}
388
389impl BasicStorage for UpdFileStorage {
390    fn size(&self) -> u64 {
391        let mut f = self.file.take().unwrap();
392        let result = f.size();
393        self.file.set(Some(f));
394        result
395    }
396    fn read(&self, off: u64, bytes: &mut [u8]) {
397        let mut f = self.file.take().unwrap();
398        f.read(off, bytes);
399        self.file.set(Some(f));
400    }
401
402    fn write(&mut self, off: u64, bytes: &[u8]) {
403        let mut f = self.file.take().unwrap();
404        f.write(off, bytes);
405        self.file.set(Some(f));
406    }
407
408    fn commit(&mut self, size: u64) {
409        let mut f = self.file.take().unwrap();
410        f.commit(size);
411        self.file.set(Some(f));
412    }
413}
414
415/// Simple implementation of [Storage] using [`std::fs::File`].
416pub struct SimpleFileStorage {
417    file: Arc<Mutex<FileInner>>,
418}
419
420impl SimpleFileStorage {
421    /// Construct from filename.
422    pub fn new(filename: &str) -> Box<Self> {
423        Box::new(Self {
424            file: Arc::new(Mutex::new(FileInner::new(filename))),
425        })
426    }
427}
428
429impl Storage for SimpleFileStorage {
430    fn clone(&self) -> Box<dyn Storage> {
431        Box::new(Self {
432            file: self.file.clone(),
433        })
434    }
435}
436
437impl BasicStorage for SimpleFileStorage {
438    fn size(&self) -> u64 {
439        self.file.lock().unwrap().size()
440    }
441
442    fn read(&self, off: u64, bytes: &mut [u8]) {
443        self.file.lock().unwrap().read(off, bytes);
444    }
445
446    fn write(&mut self, off: u64, bytes: &[u8]) {
447        self.file.lock().unwrap().write(off, bytes);
448    }
449
450    fn commit(&mut self, size: u64) {
451        self.file.lock().unwrap().commit(size);
452    }
453}
454
455/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
456pub struct AnyFileStorage {
457    filename: String,
458    files: Arc<Mutex<Vec<FileInner>>>,
459}
460
461impl AnyFileStorage {
462    /// Create new.
463    pub fn new(filename: &str) -> Box<Self> {
464        Box::new(Self {
465            filename: filename.to_owned(),
466            files: Arc::new(Mutex::new(Vec::new())),
467        })
468    }
469
470    fn get_file(&self) -> FileInner {
471        match self.files.lock().unwrap().pop() {
472            Some(f) => f,
473            _ => FileInner::new(&self.filename),
474        }
475    }
476
477    fn put_file(&self, f: FileInner) {
478        self.files.lock().unwrap().push(f);
479    }
480}
481
482impl Storage for AnyFileStorage {
483    fn clone(&self) -> Box<dyn Storage> {
484        Box::new(Self {
485            filename: self.filename.clone(),
486            files: self.files.clone(),
487        })
488    }
489}
490
491impl BasicStorage for AnyFileStorage {
492    fn size(&self) -> u64 {
493        let mut f = self.get_file();
494        let result = f.size();
495        self.put_file(f);
496        result
497    }
498
499    fn read(&self, off: u64, bytes: &mut [u8]) {
500        let mut f = self.get_file();
501        f.read(off, bytes);
502        self.put_file(f);
503    }
504
505    fn write(&mut self, off: u64, bytes: &[u8]) {
506        let mut f = self.get_file();
507        f.write(off, bytes);
508        self.put_file(f);
509    }
510
511    fn commit(&mut self, size: u64) {
512        let mut f = self.get_file();
513        f.commit(size);
514        self.put_file(f);
515    }
516}
517
518/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
519pub struct DummyFile {}
520impl DummyFile {
521    /// Construct.
522    pub fn new() -> Box<Self> {
523        Box::new(Self {})
524    }
525}
526
527impl Storage for DummyFile {
528    fn clone(&self) -> Box<dyn Storage> {
529        Self::new()
530    }
531}
532
533impl BasicStorage for DummyFile {
534    fn size(&self) -> u64 {
535        0
536    }
537
538    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
539
540    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
541
542    fn commit(&mut self, _size: u64) {}
543}
544
545/// Memory configuration limits for [`AtomicFile`].
546#[non_exhaustive]
547pub struct Limits {
548    /// Limit on size of commit write map, default is 5000.
549    pub map_lim: usize,
550    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
551    pub rbuf_mem: usize,
552    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
553    pub swbuf: usize,
554    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
555    pub uwbuf: usize,
556}
557
558impl Default for Limits {
559    fn default() -> Self {
560        Self {
561            map_lim: 5000,
562            rbuf_mem: 0x200000,
563            swbuf: 0x100000,
564            uwbuf: 0x100000,
565        }
566    }
567}
568
569/// Write Buffer.
570struct WriteBuffer {
571    /// Current write index into buf.
572    ix: usize,
573    /// Current file position.
574    pos: u64,
575    /// Underlying storage.
576    pub stg: Box<dyn BasicStorage>,
577    /// Buffer.
578    buf: Vec<u8>,
579}
580
581impl WriteBuffer {
582    /// Construct.
583    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
584        Self {
585            ix: 0,
586            pos: u64::MAX,
587            stg,
588            buf: vec![0; buf_size],
589        }
590    }
591
592    /// Write data to specified offset,
593    pub fn write(&mut self, off: u64, data: &[u8]) {
594        if self.pos + self.ix as u64 != off {
595            self.flush(off);
596        }
597        let mut done: usize = 0;
598        let mut todo: usize = data.len();
599        while todo > 0 {
600            let mut n: usize = self.buf.len() - self.ix;
601            if n == 0 {
602                self.flush(off + done as u64);
603                n = self.buf.len();
604            }
605            if n > todo {
606                n = todo;
607            }
608            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
609            todo -= n;
610            done += n;
611            self.ix += n;
612        }
613    }
614
615    fn flush(&mut self, new_pos: u64) {
616        if self.ix > 0 {
617            self.stg.write(self.pos, &self.buf[0..self.ix]);
618        }
619        self.ix = 0;
620        self.pos = new_pos;
621    }
622
623    /// Commit.
624    pub fn commit(&mut self, size: u64) {
625        self.flush(u64::MAX);
626        self.stg.commit(size);
627    }
628
629    /// Write u64.
630    pub fn write_u64(&mut self, start: u64, value: u64) {
631        self.write(start, &value.to_le_bytes());
632    }
633}
634
635/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
636///
637/// See implementation of AtomicFile for how this is used in conjunction with WMap.
638///
639/// N is buffer size.
640struct ReadBufStg<const N: usize> {
641    /// Underlying storage.
642    stg: Box<dyn Storage>,
643    /// Buffers.
644    buf: Mutex<ReadBuffer<N>>,
645    /// Read size that is considered small.
646    limit: usize,
647}
648
649impl<const N: usize> Drop for ReadBufStg<N> {
650    fn drop(&mut self) {
651        self.reset();
652    }
653}
654
655impl<const N: usize> ReadBufStg<N> {
656    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
657    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
658        Self {
659            stg,
660            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
661            limit,
662        }
663    }
664
665    /// Clears the buffers.
666    fn reset(&mut self) {
667        self.buf.lock().unwrap().reset();
668    }
669}
670
671impl<const N: usize> BasicStorage for ReadBufStg<N> {
672    /// Read data from storage.
673    fn read(&self, start: u64, data: &mut [u8]) {
674        if data.len() <= self.limit {
675            self.buf.lock().unwrap().read(&*self.stg, start, data);
676        } else {
677            self.stg.read(start, data);
678        }
679    }
680
681    /// Panics.
682    fn size(&self) -> u64 {
683        panic!()
684    }
685
686    /// Panics.
687    fn write(&mut self, _start: u64, _data: &[u8]) {
688        panic!();
689    }
690
691    /// Panics.
692    fn commit(&mut self, _size: u64) {
693        panic!();
694    }
695}
696
697struct ReadBuffer<const N: usize> {
698    /// Maps sector mumbers cached buffers.
699    map: HashMap<u64, Box<[u8; N]>>,
700    /// Maximum number of buffers.
701    max_buf: usize,
702}
703
704impl<const N: usize> ReadBuffer<N> {
705    fn new(max_buf: usize) -> Self {
706        Self {
707            map: HashMap::default(),
708            max_buf,
709        }
710    }
711
712    fn reset(&mut self) {
713        self.map.clear();
714    }
715
716    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
717        let mut done = 0;
718        while done < data.len() {
719            let off = off + done as u64;
720            let sector = off / N as u64;
721            let disp = (off % N as u64) as usize;
722            let amount = min(data.len() - done, N - disp);
723
724            let p = self.map.entry(sector).or_insert_with(|| {
725                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
726                stg.read(sector * N as u64, &mut *p);
727                p
728            });
729            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
730            done += amount;
731        }
732        if self.map.len() >= self.max_buf {
733            self.reset();
734        }
735    }
736}
737
738#[derive(Default)]
739/// Slice of Data to be written to storage.
740struct DataSlice {
741    /// Slice data.
742    pub data: Data,
743    /// Start of slice.
744    pub off: usize,
745    /// Length of slice.
746    pub len: usize,
747}
748
749impl DataSlice {
750    /// Get reference to the whole slice.
751    pub fn all(&self) -> &[u8] {
752        &self.data[self.off..self.off + self.len]
753    }
754    /// Get reference to part of slice.
755    pub fn part(&self, off: usize, len: usize) -> &[u8] {
756        &self.data[self.off + off..self.off + off + len]
757    }
758    /// Trim specified amount from start of slice.
759    pub fn trim(&mut self, trim: usize) {
760        self.off += trim;
761        self.len -= trim;
762    }
763    /// Take the data.
764    #[allow(dead_code)]
765    pub fn take(&mut self) -> Data {
766        std::mem::take(&mut self.data)
767    }
768}
769
770#[derive(Default)]
771/// Updateable store based on some underlying storage.
772struct WMap {
773    /// Map of writes. Key is the end of the slice.
774    map: BTreeMap<u64, DataSlice>,
775}
776
777impl WMap {
778    /// Is the map empty?
779    pub fn is_empty(&self) -> bool {
780        self.map.is_empty()
781    }
782
783    /// Number of key-value pairs in the map.
784    pub fn len(&self) -> usize {
785        self.map.len()
786    }
787
788    /// Take the map and convert it to a Vec.
789    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
790        let map = std::mem::take(&mut self.map);
791        let mut result = GVec::with_capacity(map.len());
792        for (end, v) in map {
793            let start = end - v.len as u64;
794            result.push((start, v));
795        }
796        result
797    }
798
799    /// Write the map into storage.
800    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
801        for (end, v) in self.map.iter() {
802            let start = end - v.len as u64;
803            stg.write_data(start, v.data.clone(), v.off, v.len);
804        }
805    }
806
807    #[cfg(not(feature = "pstd"))]
808    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
809    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
810        if len != 0 {
811            let (mut insert, mut remove) = (Vec::new(), Vec::new());
812            let end = start + len as u64;
813            for (ee, v) in self.map.range_mut(start + 1..) {
814                let ee = *ee;
815                let es = ee - v.len as u64; // Existing write Start.
816                if es >= end {
817                    // Existing write starts after end of new write, nothing to do.
818                    break;
819                } else if start <= es {
820                    if end < ee {
821                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
822                        v.trim((end - es) as usize);
823                        break;
824                    }
825                    // New write subsumes existing write entirely, remove existing write.
826                    remove.push(ee);
827                } else if end < ee {
828                    // New write starts in middle of existing write, ends before end of existing write,
829                    // put start of existing write in insert list, trim existing write.
830                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
831                    v.trim((end - es) as usize);
832                    break;
833                } else {
834                    // New write starts in middle of existing write, ends after existing write,
835                    // put start of existing write in insert list, remove existing write.
836                    insert.push((es, v.take(), v.off, (start - es) as usize));
837                    remove.push(ee);
838                }
839            }
840            for end in remove {
841                self.map.remove(&end);
842            }
843            for (start, data, off, len) in insert {
844                self.map
845                    .insert(start + len as u64, DataSlice { data, off, len });
846            }
847            self.map
848                .insert(start + len as u64, DataSlice { data, off, len });
849        }
850    }
851
852    #[cfg(feature = "pstd")]
853    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
854    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
855        if len != 0 {
856            let end = start + len as u64;
857            let mut c = self
858                .map
859                .lower_bound_mut(std::ops::Bound::Excluded(&start))
860                .with_mutable_key();
861            while let Some((eend, v)) = c.next() {
862                let ee = *eend;
863                let es = ee - v.len as u64; // Existing write Start.
864                if es >= end {
865                    // Existing write starts after end of new write, nothing to do.
866                    c.prev();
867                    break;
868                } else if start <= es {
869                    if end < ee {
870                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
871                        v.trim((end - es) as usize);
872                        c.prev();
873                        break;
874                    }
875                    // New write subsumes existing write entirely, remove existing write.
876                    c.remove_prev();
877                } else if end < ee {
878                    // New write starts in middle of existing write, ends before end of existing write,
879                    // trim existing write, insert start of existing write.
880                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
881                    v.trim((end - es) as usize);
882                    c.prev();
883                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
884                    break;
885                } else {
886                    // New write starts in middle of existing write, ends after existing write,
887                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
888                    v.len = (start - es) as usize;
889                    *eend = es + v.len as u64;
890                }
891            }
892            // Insert the new write.
893            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
894        }
895    }
896
897    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
898    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
899        let len = data.len();
900        if len != 0 {
901            let mut done = 0;
902            for (&end, v) in self.map.range(start + 1..) {
903                let es = end - v.len as u64; // Existing write Start.
904                let doff = start + done as u64;
905                if es > doff {
906                    // Read from underlying storage.
907                    let a = min(len - done, (es - doff) as usize);
908                    u.read(doff, &mut data[done..done + a]);
909                    done += a;
910                    if done == len {
911                        return;
912                    }
913                }
914                // Use existing write.
915                let skip = (start + done as u64 - es) as usize;
916                let a = min(len - done, v.len - skip);
917                data[done..done + a].copy_from_slice(v.part(skip, a));
918                done += a;
919                if done == len {
920                    return;
921                }
922            }
923            u.read(start + done as u64, &mut data[done..]);
924        }
925    }
926}
927
928/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
929pub struct BasicAtomicFile {
930    /// The main underlying storage.
931    stg: WriteBuffer,
932    /// Temporary storage for updates during commit.
933    upd: WriteBuffer,
934    /// Map of writes.
935    map: WMap,
936    /// List of writes.
937    list: GVec<(u64, DataSlice)>,
938    size: u64,
939    stop: bool,
940}
941
942impl BasicAtomicFile {
943    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
944    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
945        let size = stg.size();
946        let mut result = Box::new(Self {
947            stg: WriteBuffer::new(stg, lim.swbuf),
948            upd: WriteBuffer::new(upd, lim.uwbuf),
949            map: WMap::default(),
950            list: GVec::new(),
951            size,
952            stop: false,
953        });
954        result.init();
955        result
956    }
957
958    /// Apply outstanding updates.
959    fn init(&mut self) {
960        let end = self.upd.stg.read_u64(0);
961        let size = self.upd.stg.read_u64(8);
962        if end == 0 {
963            return;
964        }
965        assert!(end == self.upd.stg.size());
966        let mut pos = 16;
967        while pos < end {
968            let start = self.upd.stg.read_u64(pos);
969            pos += 8;
970            let len = self.upd.stg.read_u64(pos);
971            pos += 8;
972            let mut buf: GVec<u8> = gvec![0; len as usize];
973            self.upd.stg.read(pos, &mut buf);
974            pos += len;
975            self.stg.write(start, &buf);
976        }
977        self.stg.commit(size);
978        self.upd.commit(0);
979    }
980
981    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
982    pub fn commit_phase(&mut self, size: u64, phase: u8) {
983        if self.map.is_empty() && self.list.is_empty() {
984            return;
985        }
986        if phase == 1 {
987            self.list = self.map.convert_to_vec();
988
989            // Write the updates to upd.
990            // First set the end position to zero.
991            self.upd.write_u64(0, 0);
992            self.upd.write_u64(8, size);
993            self.upd.commit(16); // Not clear if this is necessary.
994
995            // Write the update records.
996            let mut stg_written = false;
997            let mut pos: u64 = 16;
998            for (start, v) in self.list.iter() {
999                let (start, len, data) = (*start, v.len as u64, v.all());
1000                if start >= self.size {
1001                    // Writes beyond current stg size can be written directly.
1002                    stg_written = true;
1003                    self.stg.write(start, data);
1004                } else {
1005                    self.upd.write_u64(pos, start);
1006                    pos += 8;
1007                    self.upd.write_u64(pos, len);
1008                    pos += 8;
1009                    self.upd.write(pos, data);
1010                    pos += len;
1011                }
1012            }
1013            if stg_written {
1014                self.stg.commit(size);
1015            }
1016            self.upd.commit(pos); // Not clear if this is necessary.
1017
1018            // Set the end position.
1019            self.upd.write_u64(0, pos);
1020            self.upd.write_u64(8, size);
1021            self.upd.commit(pos);
1022        } else {
1023            for (start, v) in self.list.iter() {
1024                if *start < self.size {
1025                    // Writes beyond current stg size have already been written.
1026                    self.stg.write(*start, v.all());
1027                }
1028            }
1029            self.list = GVec::new();
1030            self.stg.commit(size);
1031            self.upd.commit(0);
1032        }
1033    }
1034}
1035
1036impl BasicStorage for BasicAtomicFile {
1037    fn commit(&mut self, size: u64) {
1038        if self.stop { return; }
1039        self.commit_phase(size, 1);
1040        self.commit_phase(size, 2);
1041        self.size = size;
1042    }
1043
1044    fn size(&self) -> u64 {
1045        self.size
1046    }
1047
1048    fn read(&self, start: u64, data: &mut [u8]) {
1049        self.map.read(start, data, &*self.stg.stg);
1050    }
1051
1052    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1053        self.map.write(start, data, off, len);
1054    }
1055
1056    fn write(&mut self, start: u64, data: &[u8]) {
1057        let len = data.len();
1058        let d = Arc::new(data.to_vec());
1059        self.write_data(start, d, 0, len);
1060    }
1061
1062    fn shutdown(&mut self)
1063    {
1064        self.stop = true;
1065    }
1066}
1067
1068/// Optimized implementation of [Storage] ( unix only ).
1069#[cfg(target_family = "unix")]
1070pub struct UnixFileStorage {
1071    size: Arc<Mutex<u64>>,
1072    f: fs::File,
1073}
1074#[cfg(target_family = "unix")]
1075impl UnixFileStorage {
1076    /// Construct from filename.
1077    pub fn new(filename: &str) -> Box<Self> {
1078        let mut f = OpenOptions::new()
1079            .read(true)
1080            .write(true)
1081            .create(true)
1082            .truncate(false)
1083            .open(filename)
1084            .unwrap();
1085        let size = f.seek(SeekFrom::End(0)).unwrap();
1086        let size = Arc::new(Mutex::new(size));
1087        Box::new(Self { size, f })
1088    }
1089}
1090
1091#[cfg(target_family = "unix")]
1092impl Storage for UnixFileStorage {
1093    fn clone(&self) -> Box<dyn Storage> {
1094        Box::new(Self {
1095            size: self.size.clone(),
1096            f: self.f.try_clone().unwrap(),
1097        })
1098    }
1099}
1100
1101#[cfg(target_family = "unix")]
1102use std::os::unix::fs::FileExt;
1103
1104#[cfg(target_family = "unix")]
1105impl BasicStorage for UnixFileStorage {
1106    fn read(&self, start: u64, data: &mut [u8]) {
1107        let _ = self.f.read_at(data, start);
1108    }
1109
1110    fn write(&mut self, start: u64, data: &[u8]) {
1111        let _ = self.f.write_at(data, start);
1112    }
1113
1114    fn size(&self) -> u64 {
1115        *self.size.lock().unwrap()
1116    }
1117
1118    fn commit(&mut self, size: u64) {
1119        *self.size.lock().unwrap() = size;
1120        self.f.set_len(size).unwrap();
1121        self.f.sync_all().unwrap();
1122    }
1123}
1124
1125/// Optimized implementation of [Storage] ( windows only ).
1126#[cfg(target_family = "windows")]
1127pub struct WindowsFileStorage {
1128    size: Arc<Mutex<u64>>,
1129    f: fs::File,
1130}
1131#[cfg(target_family = "windows")]
1132impl WindowsFileStorage {
1133    /// Construct from filename.
1134    pub fn new(filename: &str) -> Box<Self> {
1135        let mut f = OpenOptions::new()
1136            .read(true)
1137            .write(true)
1138            .create(true)
1139            .truncate(false)
1140            .open(filename)
1141            .unwrap();
1142        let size = f.seek(SeekFrom::End(0)).unwrap();
1143        let size = Arc::new(Mutex::new(size));
1144        Box::new(Self { size, f })
1145    }
1146}
1147
1148#[cfg(target_family = "windows")]
1149impl Storage for WindowsFileStorage {
1150    fn clone(&self) -> Box<dyn Storage> {
1151        Box::new(Self {
1152            size: self.size.clone(),
1153            f: self.f.try_clone().unwrap(),
1154        })
1155    }
1156}
1157
1158#[cfg(target_family = "windows")]
1159use std::os::windows::fs::FileExt;
1160
1161#[cfg(target_family = "windows")]
1162impl BasicStorage for WindowsFileStorage {
1163    fn read(&self, start: u64, data: &mut [u8]) {
1164        let _ = self.f.seek_read(data, start);
1165    }
1166
1167    fn write(&mut self, start: u64, data: &[u8]) {
1168        let _ = self.f.seek_write(data, start);
1169    }
1170
1171    fn size(&self) -> u64 {
1172        *self.size.lock().unwrap()
1173    }
1174
1175    fn commit(&mut self, size: u64) {
1176        *self.size.lock().unwrap() = size;
1177        self.f.set_len(size).unwrap();
1178        self.f.sync_all().unwrap();
1179    }
1180}
1181
1182/// Optimised Storage ( varies according to platform ).
1183#[cfg(target_family = "windows")]
1184pub type MultiFileStorage = WindowsFileStorage;
1185
1186/// Optimised Storage ( varies according to platform ).
1187#[cfg(target_family = "unix")]
1188pub type MultiFileStorage = UnixFileStorage;
1189
1190/// Optimised Storage ( varies according to platform ).
1191#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1192pub type MultiFileStorage = AnyFileStorage;
1193
1194/// Fast Storage for upd file ( varies according to platform ).
1195#[cfg(any(target_family = "windows", target_family = "unix"))]
1196pub type FastFileStorage = MultiFileStorage;
1197
1198/// Fast Storage for upd file ( varies according to platform ).
1199#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1200pub type FastFileStorage = UpdFileStorage;
1201
1202#[cfg(test)]
1203/// Get amount of testing from environment variable TA.
1204fn test_amount() -> usize {
1205    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1206}
1207
1208#[test]
1209fn test_atomic_file() {
1210    use rand::Rng;
1211    /* Idea of test is to check AtomicFile and MemFile behave the same */
1212
1213    let ta = test_amount();
1214    println!(" Test amount={}", ta);
1215
1216    let mut rng = rand::thread_rng();
1217
1218    for _ in 0..100 {
1219        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1220        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1221        let mut s2 = MemFile::new();
1222
1223        for _ in 0..1000 * ta {
1224            let off: usize = rng.r#gen::<usize>() % 100;
1225            let mut len = 1 + rng.r#gen::<usize>() % 20;
1226            let w: bool = rng.r#gen();
1227            if w {
1228                let mut bytes = Vec::new();
1229                while len > 0 {
1230                    len -= 1;
1231                    let b: u8 = rng.r#gen::<u8>();
1232                    bytes.push(b);
1233                }
1234                s1.write(off as u64, &bytes);
1235                s2.write(off as u64, &bytes);
1236            } else {
1237                let mut b2 = vec![0; len];
1238                let mut b3 = vec![0; len];
1239                s1.read(off as u64, &mut b2);
1240                s2.read(off as u64, &mut b3);
1241                assert!(b2 == b3);
1242            }
1243            if rng.r#gen::<usize>() % 50 == 0 {
1244                s1.commit(200);
1245                s2.commit(200);
1246            }
1247        }
1248    }
1249}