Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::GTemp,
27    veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39/// ```Arc<Vec<u8>>```
40pub type Data = Arc<Vec<u8>>;
41
42/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
43/// Performs commit asyncronously.
44///
45/// #Example
46///
47/// ```
48/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
49/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
50/// af.write( 0, &[1,2,3,4] );
51/// af.commit(4);
52/// af.wait_complete();
53/// ```
54///
55/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
56/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
57/// the updates to underlying storage have been applied.
58pub struct AtomicFile {
59    /// New updates are written here.
60    map: WMap,
61    /// Underlying file, with previous updates mapped.
62    cf: Arc<RwLock<CommitFile>>,
63    /// File size.
64    size: u64,
65    /// For sending update maps to be saved.
66    tx: std::sync::mpsc::Sender<(u64, WMap)>,
67    /// Held by update process while it is active.
68    busy: Arc<Mutex<()>>,
69    /// Limit on size of CommitFile map.
70    map_lim: usize,
71}
72
73impl AtomicFile {
74    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
75    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76        Self::new_with_limits(stg, upd, &Limits::default())
77    }
78
79    /// Construct Atomic file with specified limits.
80    pub fn new_with_limits(
81        stg: Box<dyn Storage>,
82        upd: Box<dyn BasicStorage>,
83        lim: &Limits,
84    ) -> Box<Self> {
85        let size = stg.size();
86        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
91
92        // Start the thread which does save asyncronously.
93        let (cf1, busy1) = (cf.clone(), busy.clone());
94
95        std::thread::spawn(move || {
96            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
97            while let Ok((size, map)) = rx.recv() {
98                let _lock = busy1.lock();
99                baf.map = map;
100                baf.commit(size);
101                cf1.write().unwrap().done_one();
102            }
103        });
104        Box::new(Self {
105            map: WMap::default(),
106            cf,
107            size,
108            tx,
109            busy,
110            map_lim: lim.map_lim,
111        })
112    }
113}
114
115impl Storage for AtomicFile {
116    fn clone(&self) -> Box<dyn Storage> {
117        panic!()
118    }
119}
120
121impl BasicStorage for AtomicFile {
122    fn commit(&mut self, size: u64) {
123        self.size = size;
124        if self.map.is_empty() {
125            return;
126        }
127        if self.cf.read().unwrap().map.len() > self.map_lim {
128            self.wait_complete();
129        }
130        let map = std::mem::take(&mut self.map);
131        let stop =
132        {
133            let cf = &mut *self.cf.write().unwrap();
134            if cf.stop { true }
135            else
136            {
137                cf.todo += 1;
138                // Apply map of updates to CommitFile.
139                map.to_storage(cf);
140                // Send map of updates to thread to be written to underlying storage.
141                self.tx.send((size, map)).unwrap();
142                false
143            }
144        };
145        if stop {
146            // Program is terminating, loop forever.
147            loop { std::thread::sleep(std::time::Duration::from_millis(100)); }
148        }
149    }
150
151    fn size(&self) -> u64 {
152        self.size
153    }
154
155    fn read(&self, start: u64, data: &mut [u8]) {
156        self.map.read(start, data, &*self.cf.read().unwrap());
157    }
158
159    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160        self.map.write(start, data, off, len);
161    }
162
163    fn write(&mut self, start: u64, data: &[u8]) {
164        let len = data.len();
165        let d = Arc::new(data.to_vec());
166        self.write_data(start, d, 0, len);
167    }
168
169    fn wait_complete(&self) {
170       while self.cf.read().unwrap().todo != 0 {
171           std::thread::yield_now();
172           let _x = self.busy.lock();
173       }
174    }   
175
176    fn shutdown(&mut self) {
177        self.cf.write().unwrap().stop = true; // Prevents new commits from being added.
178        self.wait_complete();
179    }       
180}
181
182struct CommitFile {
183    /// Buffered underlying storage.
184    stg: ReadBufStg<256>,
185    /// Map of committed updates.
186    map: WMap,
187    /// Number of outstanding unsaved commits.
188    todo: usize,
189    /// Flag to prevent new commits starting
190    stop: bool,
191}
192
193impl CommitFile {
194    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
195        Self {
196            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
197            map: WMap::default(),
198            todo: 0,
199            stop: false,
200        }
201    }
202
203    fn done_one(&mut self) {
204        self.todo -= 1;
205        if self.todo == 0 {
206            self.map = WMap::default();
207            self.stg.reset();
208        }
209    }
210}
211
212impl BasicStorage for CommitFile {
213    fn commit(&mut self, _size: u64) {
214        panic!()
215    }
216
217    fn size(&self) -> u64 {
218        panic!()
219    }
220
221    fn read(&self, start: u64, data: &mut [u8]) {
222        self.map.read(start, data, &self.stg);
223    }
224
225    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
226        self.map.write(start, data, off, len);
227    }
228
229    fn write(&mut self, _start: u64, _data: &[u8]) {
230        panic!()
231    }
232}
233
234/// Storage interface - BasicStorage is some kind of "file" storage.
235///
236/// read and write methods take a start which is a byte offset in the underlying file.
237pub trait BasicStorage: Send {
238    /// Get the size of the underlying storage.
239    /// Note : this is valid initially and after a commit but is not defined after write is called.
240    fn size(&self) -> u64;
241
242    /// Read data.
243    fn read(&self, start: u64, data: &mut [u8]);
244
245    /// Write byte slice to storage.
246    fn write(&mut self, start: u64, data: &[u8]);
247
248    /// Write byte Vec.
249    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
250        let len = data.len();
251        let d = Arc::new(data);
252        self.write_data(start, d, 0, len);
253    }
254
255    /// Write Data slice.
256    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
257        self.write(start, &data[off..off + len]);
258    }
259
260    /// Finish write transaction, size is new size of underlying storage.
261    fn commit(&mut self, size: u64);
262
263    /// Write u64.
264    fn write_u64(&mut self, start: u64, value: u64) {
265        self.write(start, &value.to_le_bytes());
266    }
267
268    /// Read u64.
269    fn read_u64(&self, start: u64) -> u64 {
270        let mut bytes = [0; 8];
271        self.read(start, &mut bytes);
272        u64::from_le_bytes(bytes)
273    }
274
275    /// Wait until current writes are complete.
276    fn wait_complete(&self){}
277
278    /// Called on program termination.
279    fn shutdown(&mut self){}
280}
281
282/// BasicStorage with Sync and clone.
283pub trait Storage: BasicStorage + Sync {
284    /// Clone.
285    fn clone(&self) -> Box<dyn Storage>;
286}
287
288/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
289#[derive(Default)]
290pub struct MemFile {
291    v: Arc<Mutex<Vec<u8>>>,
292}
293
294impl MemFile {
295    /// Get a new (boxed) MemFile.
296    pub fn new() -> Box<Self> {
297        Box::default()
298    }
299}
300
301impl Storage for MemFile {
302    fn clone(&self) -> Box<dyn Storage> {
303        Box::new(Self { v: self.v.clone() })
304    }
305}
306
307impl BasicStorage for MemFile {
308    fn size(&self) -> u64 {
309        let v = self.v.lock().unwrap();
310        v.len() as u64
311    }
312
313    fn read(&self, off: u64, bytes: &mut [u8]) {
314        let off = off as usize;
315        let len = bytes.len();
316        let mut v = self.v.lock().unwrap();
317        if off + len > v.len() {
318            v.resize(off + len, 0);
319        }
320        bytes.copy_from_slice(&v[off..off + len]);
321    }
322
323    fn write(&mut self, off: u64, bytes: &[u8]) {
324        let off = off as usize;
325        let len = bytes.len();
326        let mut v = self.v.lock().unwrap();
327        if off + len > v.len() {
328            v.resize(off + len, 0);
329        }
330        v[off..off + len].copy_from_slice(bytes);
331    }
332
333    fn commit(&mut self, size: u64) {
334        let mut v = self.v.lock().unwrap();
335        v.resize(size as usize, 0);
336    }
337}
338
339use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
340
341struct FileInner {
342    f: fs::File,
343}
344
345impl FileInner {
346    /// Construct from filename.
347    pub fn new(filename: &str) -> Self {
348        Self {
349            f: OpenOptions::new()
350                .read(true)
351                .write(true)
352                .create(true)
353                .truncate(false)
354                .open(filename)
355                .unwrap(),
356        }
357    }
358
359    fn size(&mut self) -> u64 {
360        self.f.seek(SeekFrom::End(0)).unwrap()
361    }
362
363    fn read(&mut self, off: u64, bytes: &mut [u8]) {
364        self.f.seek(SeekFrom::Start(off)).unwrap();
365        let _ = self.f.read(bytes).unwrap();
366    }
367
368    fn write(&mut self, off: u64, bytes: &[u8]) {
369        // The list of operating systems which auto-zero is likely more than this...research is todo.
370        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
371        {
372            let size = self.f.seek(SeekFrom::End(0)).unwrap();
373            if off > size {
374                self.f.set_len(off).unwrap();
375            }
376        }
377        self.f.seek(SeekFrom::Start(off)).unwrap();
378        let _ = self.f.write(bytes).unwrap();
379    }
380
381    fn commit(&mut self, size: u64) {
382        self.f.set_len(size).unwrap();
383        self.f.sync_all().unwrap();
384    }
385}
386
387/// For atomic upd file, if not unix or windows.
388pub struct UpdFileStorage {
389    file: Cell<Option<FileInner>>,
390}
391
392impl UpdFileStorage {
393    /// Construct from filename.
394    pub fn new(filename: &str) -> Box<Self> {
395        Box::new(Self {
396            file: Cell::new(Some(FileInner::new(filename))),
397        })
398    }
399}
400
401impl BasicStorage for UpdFileStorage {
402    fn size(&self) -> u64 {
403        let mut f = self.file.take().unwrap();
404        let result = f.size();
405        self.file.set(Some(f));
406        result
407    }
408    fn read(&self, off: u64, bytes: &mut [u8]) {
409        let mut f = self.file.take().unwrap();
410        f.read(off, bytes);
411        self.file.set(Some(f));
412    }
413
414    fn write(&mut self, off: u64, bytes: &[u8]) {
415        let mut f = self.file.take().unwrap();
416        f.write(off, bytes);
417        self.file.set(Some(f));
418    }
419
420    fn commit(&mut self, size: u64) {
421        let mut f = self.file.take().unwrap();
422        f.commit(size);
423        self.file.set(Some(f));
424    }
425}
426
427/// Simple implementation of [Storage] using [`std::fs::File`].
428pub struct SimpleFileStorage {
429    file: Arc<Mutex<FileInner>>,
430}
431
432impl SimpleFileStorage {
433    /// Construct from filename.
434    pub fn new(filename: &str) -> Box<Self> {
435        Box::new(Self {
436            file: Arc::new(Mutex::new(FileInner::new(filename))),
437        })
438    }
439}
440
441impl Storage for SimpleFileStorage {
442    fn clone(&self) -> Box<dyn Storage> {
443        Box::new(Self {
444            file: self.file.clone(),
445        })
446    }
447}
448
449impl BasicStorage for SimpleFileStorage {
450    fn size(&self) -> u64 {
451        self.file.lock().unwrap().size()
452    }
453
454    fn read(&self, off: u64, bytes: &mut [u8]) {
455        self.file.lock().unwrap().read(off, bytes);
456    }
457
458    fn write(&mut self, off: u64, bytes: &[u8]) {
459        self.file.lock().unwrap().write(off, bytes);
460    }
461
462    fn commit(&mut self, size: u64) {
463        self.file.lock().unwrap().commit(size);
464    }
465}
466
467/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
468pub struct AnyFileStorage {
469    filename: String,
470    files: Arc<Mutex<Vec<FileInner>>>,
471}
472
473impl AnyFileStorage {
474    /// Create new.
475    pub fn new(filename: &str) -> Box<Self> {
476        Box::new(Self {
477            filename: filename.to_owned(),
478            files: Arc::new(Mutex::new(Vec::new())),
479        })
480    }
481
482    fn get_file(&self) -> FileInner {
483        match self.files.lock().unwrap().pop() {
484            Some(f) => f,
485            _ => FileInner::new(&self.filename),
486        }
487    }
488
489    fn put_file(&self, f: FileInner) {
490        self.files.lock().unwrap().push(f);
491    }
492}
493
494impl Storage for AnyFileStorage {
495    fn clone(&self) -> Box<dyn Storage> {
496        Box::new(Self {
497            filename: self.filename.clone(),
498            files: self.files.clone(),
499        })
500    }
501}
502
503impl BasicStorage for AnyFileStorage {
504    fn size(&self) -> u64 {
505        let mut f = self.get_file();
506        let result = f.size();
507        self.put_file(f);
508        result
509    }
510
511    fn read(&self, off: u64, bytes: &mut [u8]) {
512        let mut f = self.get_file();
513        f.read(off, bytes);
514        self.put_file(f);
515    }
516
517    fn write(&mut self, off: u64, bytes: &[u8]) {
518        let mut f = self.get_file();
519        f.write(off, bytes);
520        self.put_file(f);
521    }
522
523    fn commit(&mut self, size: u64) {
524        let mut f = self.get_file();
525        f.commit(size);
526        self.put_file(f);
527    }
528}
529
530/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
531pub struct DummyFile {}
532impl DummyFile {
533    /// Construct.
534    pub fn new() -> Box<Self> {
535        Box::new(Self {})
536    }
537}
538
539impl Storage for DummyFile {
540    fn clone(&self) -> Box<dyn Storage> {
541        Self::new()
542    }
543}
544
545impl BasicStorage for DummyFile {
546    fn size(&self) -> u64 {
547        0
548    }
549
550    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
551
552    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
553
554    fn commit(&mut self, _size: u64) {}
555}
556
557/// Memory configuration limits for [`AtomicFile`].
558#[non_exhaustive]
559pub struct Limits {
560    /// Limit on size of commit write map, default is 5000.
561    pub map_lim: usize,
562    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
563    pub rbuf_mem: usize,
564    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
565    pub swbuf: usize,
566    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
567    pub uwbuf: usize,
568}
569
570impl Default for Limits {
571    fn default() -> Self {
572        Self {
573            map_lim: 5000,
574            rbuf_mem: 0x200000,
575            swbuf: 0x100000,
576            uwbuf: 0x100000,
577        }
578    }
579}
580
581/// Write Buffer.
582struct WriteBuffer {
583    /// Current write index into buf.
584    ix: usize,
585    /// Current file position.
586    pos: u64,
587    /// Underlying storage.
588    pub stg: Box<dyn BasicStorage>,
589    /// Buffer.
590    buf: Vec<u8>,
591}
592
593impl WriteBuffer {
594    /// Construct.
595    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
596        Self {
597            ix: 0,
598            pos: u64::MAX,
599            stg,
600            buf: vec![0; buf_size],
601        }
602    }
603
604    /// Write data to specified offset,
605    pub fn write(&mut self, off: u64, data: &[u8]) {
606        if self.pos + self.ix as u64 != off {
607            self.flush(off);
608        }
609        let mut done: usize = 0;
610        let mut todo: usize = data.len();
611        while todo > 0 {
612            let mut n: usize = self.buf.len() - self.ix;
613            if n == 0 {
614                self.flush(off + done as u64);
615                n = self.buf.len();
616            }
617            if n > todo {
618                n = todo;
619            }
620            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
621            todo -= n;
622            done += n;
623            self.ix += n;
624        }
625    }
626
627    fn flush(&mut self, new_pos: u64) {
628        if self.ix > 0 {
629            self.stg.write(self.pos, &self.buf[0..self.ix]);
630        }
631        self.ix = 0;
632        self.pos = new_pos;
633    }
634
635    /// Commit.
636    pub fn commit(&mut self, size: u64) {
637        self.flush(u64::MAX);
638        self.stg.commit(size);
639    }
640
641    /// Write u64.
642    pub fn write_u64(&mut self, start: u64, value: u64) {
643        self.write(start, &value.to_le_bytes());
644    }
645}
646
647/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
648///
649/// See implementation of AtomicFile for how this is used in conjunction with WMap.
650///
651/// N is buffer size.
652struct ReadBufStg<const N: usize> {
653    /// Underlying storage.
654    stg: Box<dyn Storage>,
655    /// Buffers.
656    buf: Mutex<ReadBuffer<N>>,
657    /// Read size that is considered small.
658    limit: usize,
659}
660
661impl<const N: usize> Drop for ReadBufStg<N> {
662    fn drop(&mut self) {
663        self.reset();
664    }
665}
666
667impl<const N: usize> ReadBufStg<N> {
668    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
669    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
670        Self {
671            stg,
672            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
673            limit,
674        }
675    }
676
677    /// Clears the buffers.
678    fn reset(&mut self) {
679        self.buf.lock().unwrap().reset();
680    }
681}
682
683impl<const N: usize> BasicStorage for ReadBufStg<N> {
684    /// Read data from storage.
685    fn read(&self, start: u64, data: &mut [u8]) {
686        if data.len() <= self.limit {
687            self.buf.lock().unwrap().read(&*self.stg, start, data);
688        } else {
689            self.stg.read(start, data);
690        }
691    }
692
693    /// Panics.
694    fn size(&self) -> u64 {
695        panic!()
696    }
697
698    /// Panics.
699    fn write(&mut self, _start: u64, _data: &[u8]) {
700        panic!();
701    }
702
703    /// Panics.
704    fn commit(&mut self, _size: u64) {
705        panic!();
706    }
707}
708
709struct ReadBuffer<const N: usize> {
710    /// Maps sector mumbers cached buffers.
711    map: HashMap<u64, Box<[u8; N]>>,
712    /// Maximum number of buffers.
713    max_buf: usize,
714}
715
716impl<const N: usize> ReadBuffer<N> {
717    fn new(max_buf: usize) -> Self {
718        Self {
719            map: HashMap::default(),
720            max_buf,
721        }
722    }
723
724    fn reset(&mut self) {
725        self.map.clear();
726    }
727
728    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
729        let mut done = 0;
730        while done < data.len() {
731            let off = off + done as u64;
732            let sector = off / N as u64;
733            let disp = (off % N as u64) as usize;
734            let amount = min(data.len() - done, N - disp);
735
736            let p = self.map.entry(sector).or_insert_with(|| {
737                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
738                stg.read(sector * N as u64, &mut *p);
739                p
740            });
741            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
742            done += amount;
743        }
744        if self.map.len() >= self.max_buf {
745            self.reset();
746        }
747    }
748}
749
750#[derive(Default)]
751/// Slice of Data to be written to storage.
752struct DataSlice {
753    /// Slice data.
754    pub data: Data,
755    /// Start of slice.
756    pub off: usize,
757    /// Length of slice.
758    pub len: usize,
759}
760
761impl DataSlice {
762    /// Get reference to the whole slice.
763    pub fn all(&self) -> &[u8] {
764        &self.data[self.off..self.off + self.len]
765    }
766    /// Get reference to part of slice.
767    pub fn part(&self, off: usize, len: usize) -> &[u8] {
768        &self.data[self.off + off..self.off + off + len]
769    }
770    /// Trim specified amount from start of slice.
771    pub fn trim(&mut self, trim: usize) {
772        self.off += trim;
773        self.len -= trim;
774    }
775    /// Take the data.
776    #[allow(dead_code)]
777    pub fn take(&mut self) -> Data {
778        std::mem::take(&mut self.data)
779    }
780}
781
782#[derive(Default)]
783/// Updateable store based on some underlying storage.
784struct WMap {
785    /// Map of writes. Key is the end of the slice.
786    map: BTreeMap<u64, DataSlice>,
787}
788
789impl WMap {
790    /// Is the map empty?
791    pub fn is_empty(&self) -> bool {
792        self.map.is_empty()
793    }
794
795    /// Number of key-value pairs in the map.
796    pub fn len(&self) -> usize {
797        self.map.len()
798    }
799
800    /// Take the map and convert it to a Vec.
801    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
802        let map = std::mem::take(&mut self.map);
803        let mut result = GVec::with_capacity(map.len());
804        for (end, v) in map {
805            let start = end - v.len as u64;
806            result.push((start, v));
807        }
808        result
809    }
810
811    /// Write the map into storage.
812    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
813        for (end, v) in self.map.iter() {
814            let start = end - v.len as u64;
815            stg.write_data(start, v.data.clone(), v.off, v.len);
816        }
817    }
818
819    #[cfg(not(feature = "pstd"))]
820    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
821    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
822        if len != 0 {
823            let (mut insert, mut remove) = (Vec::new(), Vec::new());
824            let end = start + len as u64;
825            for (ee, v) in self.map.range_mut(start + 1..) {
826                let ee = *ee;
827                let es = ee - v.len as u64; // Existing write Start.
828                if es >= end {
829                    // Existing write starts after end of new write, nothing to do.
830                    break;
831                } else if start <= es {
832                    if end < ee {
833                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
834                        v.trim((end - es) as usize);
835                        break;
836                    }
837                    // New write subsumes existing write entirely, remove existing write.
838                    remove.push(ee);
839                } else if end < ee {
840                    // New write starts in middle of existing write, ends before end of existing write,
841                    // put start of existing write in insert list, trim existing write.
842                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
843                    v.trim((end - es) as usize);
844                    break;
845                } else {
846                    // New write starts in middle of existing write, ends after existing write,
847                    // put start of existing write in insert list, remove existing write.
848                    insert.push((es, v.take(), v.off, (start - es) as usize));
849                    remove.push(ee);
850                }
851            }
852            for end in remove {
853                self.map.remove(&end);
854            }
855            for (start, data, off, len) in insert {
856                self.map
857                    .insert(start + len as u64, DataSlice { data, off, len });
858            }
859            self.map
860                .insert(start + len as u64, DataSlice { data, off, len });
861        }
862    }
863
864    #[cfg(feature = "pstd")]
865    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
866    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
867        if len != 0 {
868            let end = start + len as u64;
869            let mut c = self
870                .map
871                .lower_bound_mut(std::ops::Bound::Excluded(&start))
872                .with_mutable_key();
873            while let Some((eend, v)) = c.next() {
874                let ee = *eend;
875                let es = ee - v.len as u64; // Existing write Start.
876                if es >= end {
877                    // Existing write starts after end of new write, nothing to do.
878                    c.prev();
879                    break;
880                } else if start <= es {
881                    if end < ee {
882                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
883                        v.trim((end - es) as usize);
884                        c.prev();
885                        break;
886                    }
887                    // New write subsumes existing write entirely, remove existing write.
888                    c.remove_prev();
889                } else if end < ee {
890                    // New write starts in middle of existing write, ends before end of existing write,
891                    // trim existing write, insert start of existing write.
892                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
893                    v.trim((end - es) as usize);
894                    c.prev();
895                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
896                    break;
897                } else {
898                    // New write starts in middle of existing write, ends after existing write,
899                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
900                    v.len = (start - es) as usize;
901                    *eend = es + v.len as u64;
902                }
903            }
904            // Insert the new write.
905            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
906        }
907    }
908
909    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
910    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
911        let len = data.len();
912        if len != 0 {
913            let mut done = 0;
914            for (&end, v) in self.map.range(start + 1..) {
915                let es = end - v.len as u64; // Existing write Start.
916                let doff = start + done as u64;
917                if es > doff {
918                    // Read from underlying storage.
919                    let a = min(len - done, (es - doff) as usize);
920                    u.read(doff, &mut data[done..done + a]);
921                    done += a;
922                    if done == len {
923                        return;
924                    }
925                }
926                // Use existing write.
927                let skip = (start + done as u64 - es) as usize;
928                let a = min(len - done, v.len - skip);
929                data[done..done + a].copy_from_slice(v.part(skip, a));
930                done += a;
931                if done == len {
932                    return;
933                }
934            }
935            u.read(start + done as u64, &mut data[done..]);
936        }
937    }
938}
939
940/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
941pub struct BasicAtomicFile {
942    /// The main underlying storage.
943    stg: WriteBuffer,
944    /// Temporary storage for updates during commit.
945    upd: WriteBuffer,
946    /// Map of writes.
947    map: WMap,
948    /// List of writes.
949    list: GVec<(u64, DataSlice)>,
950    size: u64,
951    stop: bool,
952}
953
954impl BasicAtomicFile {
955    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
956    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
957        let size = stg.size();
958        let mut result = Box::new(Self {
959            stg: WriteBuffer::new(stg, lim.swbuf),
960            upd: WriteBuffer::new(upd, lim.uwbuf),
961            map: WMap::default(),
962            list: GVec::new(),
963            size,
964            stop: false,
965        });
966        result.init();
967        result
968    }
969
970    /// Apply outstanding updates.
971    fn init(&mut self) {
972        let end = self.upd.stg.read_u64(0);
973        let size = self.upd.stg.read_u64(8);
974        if end == 0 {
975            return;
976        }
977        assert!(end == self.upd.stg.size());
978        let mut pos = 16;
979        while pos < end {
980            let start = self.upd.stg.read_u64(pos);
981            pos += 8;
982            let len = self.upd.stg.read_u64(pos);
983            pos += 8;
984            let mut buf: GVec<u8> = gvec![0; len as usize];
985            self.upd.stg.read(pos, &mut buf);
986            pos += len;
987            self.stg.write(start, &buf);
988        }
989        self.stg.commit(size);
990        self.upd.commit(0);
991    }
992
993    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
994    pub fn commit_phase(&mut self, size: u64, phase: u8) {
995        if self.map.is_empty() && self.list.is_empty() {
996            return;
997        }
998        if phase == 1 {
999            self.list = self.map.convert_to_vec();
1000
1001            // Write the updates to upd.
1002            // First set the end position to zero.
1003            self.upd.write_u64(0, 0);
1004            self.upd.write_u64(8, size);
1005            self.upd.commit(16); // Not clear if this is necessary.
1006
1007            // Write the update records.
1008            let mut stg_written = false;
1009            let mut pos: u64 = 16;
1010            for (start, v) in self.list.iter() {
1011                let (start, len, data) = (*start, v.len as u64, v.all());
1012                if start >= self.size {
1013                    // Writes beyond current stg size can be written directly.
1014                    stg_written = true;
1015                    self.stg.write(start, data);
1016                } else {
1017                    self.upd.write_u64(pos, start);
1018                    pos += 8;
1019                    self.upd.write_u64(pos, len);
1020                    pos += 8;
1021                    self.upd.write(pos, data);
1022                    pos += len;
1023                }
1024            }
1025            if stg_written {
1026                self.stg.commit(size);
1027            }
1028            self.upd.commit(pos); // Not clear if this is necessary.
1029
1030            // Set the end position.
1031            self.upd.write_u64(0, pos);
1032            self.upd.write_u64(8, size);
1033            self.upd.commit(pos);
1034        } else {
1035            for (start, v) in self.list.iter() {
1036                if *start < self.size {
1037                    // Writes beyond current stg size have already been written.
1038                    self.stg.write(*start, v.all());
1039                }
1040            }
1041            self.list = GVec::new();
1042            self.stg.commit(size);
1043            self.upd.commit(0);
1044        }
1045    }
1046}
1047
1048impl BasicStorage for BasicAtomicFile {
1049    fn commit(&mut self, size: u64) {
1050        if self.stop { return; }
1051        self.commit_phase(size, 1);
1052        self.commit_phase(size, 2);
1053        self.size = size;
1054    }
1055
1056    fn size(&self) -> u64 {
1057        self.size
1058    }
1059
1060    fn read(&self, start: u64, data: &mut [u8]) {
1061        self.map.read(start, data, &*self.stg.stg);
1062    }
1063
1064    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1065        self.map.write(start, data, off, len);
1066    }
1067
1068    fn write(&mut self, start: u64, data: &[u8]) {
1069        let len = data.len();
1070        let d = Arc::new(data.to_vec());
1071        self.write_data(start, d, 0, len);
1072    }
1073
1074    fn shutdown(&mut self)
1075    {
1076        self.stop = true;
1077    }
1078}
1079
1080/// Optimized implementation of [Storage] ( unix only ).
1081#[cfg(target_family = "unix")]
1082pub struct UnixFileStorage {
1083    size: Arc<Mutex<u64>>,
1084    f: fs::File,
1085}
1086#[cfg(target_family = "unix")]
1087impl UnixFileStorage {
1088    /// Construct from filename.
1089    pub fn new(filename: &str) -> Box<Self> {
1090        let mut f = OpenOptions::new()
1091            .read(true)
1092            .write(true)
1093            .create(true)
1094            .truncate(false)
1095            .open(filename)
1096            .unwrap();
1097        let size = f.seek(SeekFrom::End(0)).unwrap();
1098        let size = Arc::new(Mutex::new(size));
1099        Box::new(Self { size, f })
1100    }
1101}
1102
1103#[cfg(target_family = "unix")]
1104impl Storage for UnixFileStorage {
1105    fn clone(&self) -> Box<dyn Storage> {
1106        Box::new(Self {
1107            size: self.size.clone(),
1108            f: self.f.try_clone().unwrap(),
1109        })
1110    }
1111}
1112
1113#[cfg(target_family = "unix")]
1114use std::os::unix::fs::FileExt;
1115
1116#[cfg(target_family = "unix")]
1117impl BasicStorage for UnixFileStorage {
1118    fn read(&self, start: u64, data: &mut [u8]) {
1119        let _ = self.f.read_at(data, start);
1120    }
1121
1122    fn write(&mut self, start: u64, data: &[u8]) {
1123        let _ = self.f.write_at(data, start);
1124    }
1125
1126    fn size(&self) -> u64 {
1127        *self.size.lock().unwrap()
1128    }
1129
1130    fn commit(&mut self, size: u64) {
1131        *self.size.lock().unwrap() = size;
1132        self.f.set_len(size).unwrap();
1133        self.f.sync_all().unwrap();
1134    }
1135}
1136
1137/// Optimized implementation of [Storage] ( windows only ).
1138#[cfg(target_family = "windows")]
1139pub struct WindowsFileStorage {
1140    size: Arc<Mutex<u64>>,
1141    f: fs::File,
1142}
1143#[cfg(target_family = "windows")]
1144impl WindowsFileStorage {
1145    /// Construct from filename.
1146    pub fn new(filename: &str) -> Box<Self> {
1147        let mut f = OpenOptions::new()
1148            .read(true)
1149            .write(true)
1150            .create(true)
1151            .truncate(false)
1152            .open(filename)
1153            .unwrap();
1154        let size = f.seek(SeekFrom::End(0)).unwrap();
1155        let size = Arc::new(Mutex::new(size));
1156        Box::new(Self { size, f })
1157    }
1158}
1159
1160#[cfg(target_family = "windows")]
1161impl Storage for WindowsFileStorage {
1162    fn clone(&self) -> Box<dyn Storage> {
1163        Box::new(Self {
1164            size: self.size.clone(),
1165            f: self.f.try_clone().unwrap(),
1166        })
1167    }
1168}
1169
1170#[cfg(target_family = "windows")]
1171use std::os::windows::fs::FileExt;
1172
1173#[cfg(target_family = "windows")]
1174impl BasicStorage for WindowsFileStorage {
1175    fn read(&self, start: u64, data: &mut [u8]) {
1176        let _ = self.f.seek_read(data, start);
1177    }
1178
1179    fn write(&mut self, start: u64, data: &[u8]) {
1180        let _ = self.f.seek_write(data, start);
1181    }
1182
1183    fn size(&self) -> u64 {
1184        *self.size.lock().unwrap()
1185    }
1186
1187    fn commit(&mut self, size: u64) {
1188        *self.size.lock().unwrap() = size;
1189        self.f.set_len(size).unwrap();
1190        self.f.sync_all().unwrap();
1191    }
1192}
1193
1194/// Optimised Storage ( varies according to platform ).
1195#[cfg(target_family = "windows")]
1196pub type MultiFileStorage = WindowsFileStorage;
1197
1198/// Optimised Storage ( varies according to platform ).
1199#[cfg(target_family = "unix")]
1200pub type MultiFileStorage = UnixFileStorage;
1201
1202/// Optimised Storage ( varies according to platform ).
1203#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1204pub type MultiFileStorage = AnyFileStorage;
1205
1206/// Fast Storage for upd file ( varies according to platform ).
1207#[cfg(any(target_family = "windows", target_family = "unix"))]
1208pub type FastFileStorage = MultiFileStorage;
1209
1210/// Fast Storage for upd file ( varies according to platform ).
1211#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1212pub type FastFileStorage = UpdFileStorage;
1213
1214#[cfg(test)]
1215/// Get amount of testing from environment variable TA.
1216fn test_amount() -> usize {
1217    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1218}
1219
1220#[test]
1221fn test_atomic_file() {
1222    use rand::Rng;
1223    /* Idea of test is to check AtomicFile and MemFile behave the same */
1224
1225    let ta = test_amount();
1226    println!(" Test amount={}", ta);
1227
1228    let mut rng = rand::thread_rng();
1229
1230    for _ in 0..100 {
1231        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1232        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1233        let mut s2 = MemFile::new();
1234
1235        for _ in 0..1000 * ta {
1236            let off: usize = rng.r#gen::<usize>() % 100;
1237            let mut len = 1 + rng.r#gen::<usize>() % 20;
1238            let w: bool = rng.r#gen();
1239            if w {
1240                let mut bytes = Vec::new();
1241                while len > 0 {
1242                    len -= 1;
1243                    let b: u8 = rng.r#gen::<u8>();
1244                    bytes.push(b);
1245                }
1246                s1.write(off as u64, &bytes);
1247                s2.write(off as u64, &bytes);
1248            } else {
1249                let mut b2 = vec![0; len];
1250                let mut b3 = vec![0; len];
1251                s1.read(off as u64, &mut b2);
1252                s2.read(off as u64, &mut b3);
1253                assert!(b2 == b3);
1254            }
1255            if rng.r#gen::<usize>() % 50 == 0 {
1256                s1.commit(200);
1257                s2.commit(200);
1258            }
1259        }
1260    }
1261}