Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{collections::{BTreeMapA,btree_map::CustomTuning}, localalloc::GTemp};
24
25#[cfg(feature = "pstd")]
26type BTreeMap<K,V> = BTreeMapA<K,V,CustomTuning<GTemp>>;
27
28#[cfg(not(feature = "pstd"))]
29use std::collections::BTreeMap;
30
31/// ```Arc<Vec<u8>>```
32pub type Data = Arc<Vec<u8>>;
33
34/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
35/// Performs commit asyncronously.
36///
37/// #Example
38///
39/// ```
40/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
41/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
42/// af.write( 0, &[1,2,3,4] );
43/// af.commit(4);
44/// af.wait_complete();
45/// ```
46///
47/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
48/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
49/// the updates to underlying storage have been applied.
50pub struct AtomicFile {
51    /// New updates are written here.
52    map: WMap,
53    /// Underlying file, with previous updates mapped.
54    cf: Arc<RwLock<CommitFile>>,
55    /// File size.
56    size: u64,
57    /// For sending update maps to be saved.
58    tx: std::sync::mpsc::Sender<(u64, WMap)>,
59    /// Held by update process while it is active.
60    busy: Arc<Mutex<()>>,
61    /// Limit on size of CommitFile map.
62    map_lim: usize,
63}
64
65impl AtomicFile {
66    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
67    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
68        Self::new_with_limits(stg, upd, &Limits::default())
69    }
70
71    /// Construct Atomic file with specified limits.
72    pub fn new_with_limits(
73        stg: Box<dyn Storage>,
74        upd: Box<dyn BasicStorage>,
75        lim: &Limits,
76    ) -> Box<Self> {
77        let size = stg.size();
78        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
79
80        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
81        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
82        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
83
84        // Start the thread which does save asyncronously.
85        let (cf1, busy1) = (cf.clone(), busy.clone());
86
87        std::thread::spawn(move || {
88            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
89            while let Ok((size, map)) = rx.recv() {
90                let _lock = busy1.lock();
91                baf.map = map;
92                baf.commit(size);
93                cf1.write().unwrap().done_one();
94            }
95        });
96        Box::new(Self {
97            map: WMap::default(),
98            cf,
99            size,
100            tx,
101            busy,
102            map_lim: lim.map_lim,
103        })
104    }
105}
106
107impl Storage for AtomicFile {
108    fn clone(&self) -> Box<dyn Storage> {
109        panic!()
110    }
111}
112
113impl BasicStorage for AtomicFile {
114    fn commit(&mut self, size: u64) {
115        self.size = size;
116        if self.map.is_empty() {
117            return;
118        }
119        if self.cf.read().unwrap().map.len() > self.map_lim {
120            self.wait_complete();
121        }
122        let map = std::mem::take(&mut self.map);
123        let cf = &mut *self.cf.write().unwrap();
124        cf.todo += 1;
125        // Apply map of updates to CommitFile.
126        map.to_storage(cf);
127        // Send map of updates to thread to be written to underlying storage.
128        self.tx.send((size, map)).unwrap();
129    }
130
131    fn size(&self) -> u64 {
132        self.size
133    }
134
135    fn read(&self, start: u64, data: &mut [u8]) {
136        self.map.read(start, data, &*self.cf.read().unwrap());
137    }
138
139    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
140        self.map.write(start, data, off, len);
141    }
142
143    fn write(&mut self, start: u64, data: &[u8]) {
144        let len = data.len();
145        let d = Arc::new(data.to_vec());
146        self.write_data(start, d, 0, len);
147    }
148
149    fn wait_complete(&self) {
150        while self.cf.read().unwrap().todo != 0 {
151            let _x = self.busy.lock();
152        }
153    }
154}
155
156struct CommitFile {
157    /// Buffered underlying storage.
158    stg: ReadBufStg<256>,
159    /// Map of committed updates.
160    map: WMap,
161    /// Number of outstanding unsaved commits.
162    todo: usize,
163}
164
165impl CommitFile {
166    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
167        Self {
168            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
169            map: WMap::default(),
170            todo: 0,
171        }
172    }
173
174    fn done_one(&mut self) {
175        self.todo -= 1;
176        if self.todo == 0 {
177            self.map = WMap::default();
178            self.stg.reset();
179        }
180    }
181}
182
183impl BasicStorage for CommitFile {
184    fn commit(&mut self, _size: u64) {
185        panic!()
186    }
187
188    fn size(&self) -> u64 {
189        panic!()
190    }
191
192    fn read(&self, start: u64, data: &mut [u8]) {
193        self.map.read(start, data, &self.stg);
194    }
195
196    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
197        self.map.write(start, data, off, len);
198    }
199
200    fn write(&mut self, _start: u64, _data: &[u8]) {
201        panic!()
202    }
203}
204
205/// Storage interface - BasicStorage is some kind of "file" storage.
206///
207/// read and write methods take a start which is a byte offset in the underlying file.
208pub trait BasicStorage: Send {
209    /// Get the size of the underlying storage.
210    /// Note : this is valid initially and after a commit but is not defined after write is called.
211    fn size(&self) -> u64;
212
213    /// Read data.
214    fn read(&self, start: u64, data: &mut [u8]);
215
216    /// Write byte slice to storage.
217    fn write(&mut self, start: u64, data: &[u8]);
218
219    /// Write byte Vec.
220    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
221        let len = data.len();
222        let d = Arc::new(data);
223        self.write_data(start, d, 0, len);
224    }
225
226    /// Write Data slice.
227    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
228        self.write(start, &data[off..off + len]);
229    }
230
231    /// Finish write transaction, size is new size of underlying storage.
232    fn commit(&mut self, size: u64);
233
234    /// Write u64.
235    fn write_u64(&mut self, start: u64, value: u64) {
236        self.write(start, &value.to_le_bytes());
237    }
238
239    /// Read u64.
240    fn read_u64(&self, start: u64) -> u64 {
241        let mut bytes = [0; 8];
242        self.read(start, &mut bytes);
243        u64::from_le_bytes(bytes)
244    }
245
246    /// Wait until current writes are complete.
247    fn wait_complete(&self) {}
248}
249
250/// BasicStorage with Sync and clone.
251pub trait Storage: BasicStorage + Sync {
252    /// Clone.
253    fn clone(&self) -> Box<dyn Storage>;
254}
255
256/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
257#[derive(Default)]
258pub struct MemFile {
259    v: Arc<Mutex<Vec<u8>>>,
260}
261
262impl MemFile {
263    /// Get a new (boxed) MemFile.
264    pub fn new() -> Box<Self> {
265        Box::default()
266    }
267}
268
269impl Storage for MemFile {
270    fn clone(&self) -> Box<dyn Storage> {
271        Box::new(Self { v: self.v.clone() })
272    }
273}
274
275impl BasicStorage for MemFile {
276    fn size(&self) -> u64 {
277        let v = self.v.lock().unwrap();
278        v.len() as u64
279    }
280
281    fn read(&self, off: u64, bytes: &mut [u8]) {
282        let off = off as usize;
283        let len = bytes.len();
284        let mut v = self.v.lock().unwrap();
285        if off + len > v.len() {
286            v.resize(off + len, 0);
287        }
288        bytes.copy_from_slice(&v[off..off + len]);
289    }
290
291    fn write(&mut self, off: u64, bytes: &[u8]) {
292        let off = off as usize;
293        let len = bytes.len();
294        let mut v = self.v.lock().unwrap();
295        if off + len > v.len() {
296            v.resize(off + len, 0);
297        }
298        v[off..off + len].copy_from_slice(bytes);
299    }
300
301    fn commit(&mut self, size: u64) {
302        let mut v = self.v.lock().unwrap();
303        v.resize(size as usize, 0);
304    }
305}
306
307use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
308
309struct FileInner {
310    f: fs::File,
311}
312
313impl FileInner {
314    /// Construct from filename.
315    pub fn new(filename: &str) -> Self {
316        Self {
317            f: OpenOptions::new()
318                .read(true)
319                .write(true)
320                .create(true)
321                .truncate(false)
322                .open(filename)
323                .unwrap(),
324        }
325    }
326
327    fn size(&mut self) -> u64 {
328        self.f.seek(SeekFrom::End(0)).unwrap()
329    }
330
331    fn read(&mut self, off: u64, bytes: &mut [u8]) {
332        self.f.seek(SeekFrom::Start(off)).unwrap();
333        let _ = self.f.read(bytes).unwrap();
334    }
335
336    fn write(&mut self, off: u64, bytes: &[u8]) {
337        // The list of operating systems which auto-zero is likely more than this...research is todo.
338        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
339        {
340            let size = self.f.seek(SeekFrom::End(0)).unwrap();
341            if off > size {
342                self.f.set_len(off).unwrap();
343            }
344        }
345        self.f.seek(SeekFrom::Start(off)).unwrap();
346        let _ = self.f.write(bytes).unwrap();
347    }
348
349    fn commit(&mut self, size: u64) {
350        self.f.set_len(size).unwrap();
351        self.f.sync_all().unwrap();
352    }
353}
354
355/// For atomic upd file, if not unix or windows.
356pub struct UpdFileStorage {
357    file: Cell<Option<FileInner>>,
358}
359
360impl UpdFileStorage {
361    /// Construct from filename.
362    pub fn new(filename: &str) -> Box<Self> {
363        Box::new(Self {
364            file: Cell::new(Some(FileInner::new(filename))),
365        })
366    }
367}
368
369impl BasicStorage for UpdFileStorage {
370    fn size(&self) -> u64 {
371        let mut f = self.file.take().unwrap();
372        let result = f.size();
373        self.file.set(Some(f));
374        result
375    }
376    fn read(&self, off: u64, bytes: &mut [u8]) {
377        let mut f = self.file.take().unwrap();
378        f.read(off, bytes);
379        self.file.set(Some(f));
380    }
381
382    fn write(&mut self, off: u64, bytes: &[u8]) {
383        let mut f = self.file.take().unwrap();
384        f.write(off, bytes);
385        self.file.set(Some(f));
386    }
387
388    fn commit(&mut self, size: u64) {
389        let mut f = self.file.take().unwrap();
390        f.commit(size);
391        self.file.set(Some(f));
392    }
393}
394
395/// Simple implementation of [Storage] using [`std::fs::File`].
396pub struct SimpleFileStorage {
397    file: Arc<Mutex<FileInner>>,
398}
399
400impl SimpleFileStorage {
401    /// Construct from filename.
402    pub fn new(filename: &str) -> Box<Self> {
403        Box::new(Self {
404            file: Arc::new(Mutex::new(FileInner::new(filename))),
405        })
406    }
407}
408
409impl Storage for SimpleFileStorage {
410    fn clone(&self) -> Box<dyn Storage> {
411        Box::new(Self {
412            file: self.file.clone(),
413        })
414    }
415}
416
417impl BasicStorage for SimpleFileStorage {
418    fn size(&self) -> u64 {
419        self.file.lock().unwrap().size()
420    }
421
422    fn read(&self, off: u64, bytes: &mut [u8]) {
423        self.file.lock().unwrap().read(off, bytes);
424    }
425
426    fn write(&mut self, off: u64, bytes: &[u8]) {
427        self.file.lock().unwrap().write(off, bytes);
428    }
429
430    fn commit(&mut self, size: u64) {
431        self.file.lock().unwrap().commit(size);
432    }
433}
434
435/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
436pub struct AnyFileStorage {
437    filename: String,
438    files: Arc<Mutex<Vec<FileInner>>>,
439}
440
441impl AnyFileStorage {
442    /// Create new.
443    pub fn new(filename: &str) -> Box<Self> {
444        Box::new(Self {
445            filename: filename.to_owned(),
446            files: Arc::new(Mutex::new(Vec::new())),
447        })
448    }
449
450    fn get_file(&self) -> FileInner {
451        match self.files.lock().unwrap().pop() {
452            Some(f) => f,
453            _ => FileInner::new(&self.filename),
454        }
455    }
456
457    fn put_file(&self, f: FileInner) {
458        self.files.lock().unwrap().push(f);
459    }
460}
461
462impl Storage for AnyFileStorage {
463    fn clone(&self) -> Box<dyn Storage> {
464        Box::new(Self {
465            filename: self.filename.clone(),
466            files: self.files.clone(),
467        })
468    }
469}
470
471impl BasicStorage for AnyFileStorage {
472    fn size(&self) -> u64 {
473        let mut f = self.get_file();
474        let result = f.size();
475        self.put_file(f);
476        result
477    }
478
479    fn read(&self, off: u64, bytes: &mut [u8]) {
480        let mut f = self.get_file();
481        f.read(off, bytes);
482        self.put_file(f);
483    }
484
485    fn write(&mut self, off: u64, bytes: &[u8]) {
486        let mut f = self.get_file();
487        f.write(off, bytes);
488        self.put_file(f);
489    }
490
491    fn commit(&mut self, size: u64) {
492        let mut f = self.get_file();
493        f.commit(size);
494        self.put_file(f);
495    }
496}
497
498/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
499pub struct DummyFile {}
500impl DummyFile {
501    /// Construct.
502    pub fn new() -> Box<Self> {
503        Box::new(Self {})
504    }
505}
506
507impl Storage for DummyFile {
508    fn clone(&self) -> Box<dyn Storage> {
509        Self::new()
510    }
511}
512
513impl BasicStorage for DummyFile {
514    fn size(&self) -> u64 {
515        0
516    }
517
518    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
519
520    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
521
522    fn commit(&mut self, _size: u64) {}
523}
524
525/// Memory configuration limits for [`AtomicFile`].
526#[non_exhaustive]
527pub struct Limits {
528    /// Limit on size of commit write map, default is 5000.
529    pub map_lim: usize,
530    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
531    pub rbuf_mem: usize,
532    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
533    pub swbuf: usize,
534    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
535    pub uwbuf: usize,
536}
537
538impl Default for Limits {
539    fn default() -> Self {
540        Self {
541            map_lim: 5000,
542            rbuf_mem: 0x200000,
543            swbuf: 0x100000,
544            uwbuf: 0x100000,
545        }
546    }
547}
548
549/// Write Buffer.
550struct WriteBuffer {
551    /// Current write index into buf.
552    ix: usize,
553    /// Current file position.
554    pos: u64,
555    /// Underlying storage.
556    pub stg: Box<dyn BasicStorage>,
557    /// Buffer.
558    buf: Vec<u8>,
559}
560
561impl WriteBuffer {
562    /// Construct.
563    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
564        Self {
565            ix: 0,
566            pos: u64::MAX,
567            stg,
568            buf: vec![0; buf_size],
569        }
570    }
571
572    /// Write data to specified offset,
573    pub fn write(&mut self, off: u64, data: &[u8]) {
574        if self.pos + self.ix as u64 != off {
575            self.flush(off);
576        }
577        let mut done: usize = 0;
578        let mut todo: usize = data.len();
579        while todo > 0 {
580            let mut n: usize = self.buf.len() - self.ix;
581            if n == 0 {
582                self.flush(off + done as u64);
583                n = self.buf.len();
584            }
585            if n > todo {
586                n = todo;
587            }
588            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
589            todo -= n;
590            done += n;
591            self.ix += n;
592        }
593    }
594
595    fn flush(&mut self, new_pos: u64) {
596        if self.ix > 0 {
597            self.stg.write(self.pos, &self.buf[0..self.ix]);
598        }
599        self.ix = 0;
600        self.pos = new_pos;
601    }
602
603    /// Commit.
604    pub fn commit(&mut self, size: u64) {
605        self.flush(u64::MAX);
606        self.stg.commit(size);
607    }
608
609    /// Write u64.
610    pub fn write_u64(&mut self, start: u64, value: u64) {
611        self.write(start, &value.to_le_bytes());
612    }
613}
614
615/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
616///
617/// See implementation of AtomicFile for how this is used in conjunction with WMap.
618///
619/// N is buffer size.
620struct ReadBufStg<const N: usize> {
621    /// Underlying storage.
622    stg: Box<dyn Storage>,
623    /// Buffers.
624    buf: Mutex<ReadBuffer<N>>,
625    /// Read size that is considered small.
626    limit: usize,
627}
628
629impl<const N: usize> Drop for ReadBufStg<N> {
630    fn drop(&mut self) {
631        self.reset();
632    }
633}
634
635impl<const N: usize> ReadBufStg<N> {
636    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
637    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
638        Self {
639            stg,
640            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
641            limit,
642        }
643    }
644
645    /// Clears the buffers.
646    fn reset(&mut self) {
647        self.buf.lock().unwrap().reset();
648    }
649}
650
651impl<const N: usize> BasicStorage for ReadBufStg<N> {
652    /// Read data from storage.
653    fn read(&self, start: u64, data: &mut [u8]) {
654        if data.len() <= self.limit {
655            self.buf.lock().unwrap().read(&*self.stg, start, data);
656        } else {
657            self.stg.read(start, data);
658        }
659    }
660
661    /// Panics.
662    fn size(&self) -> u64 {
663        panic!()
664    }
665
666    /// Panics.
667    fn write(&mut self, _start: u64, _data: &[u8]) {
668        panic!();
669    }
670
671    /// Panics.
672    fn commit(&mut self, _size: u64) {
673        panic!();
674    }
675}
676
677struct ReadBuffer<const N: usize> {
678    /// Maps sector mumbers cached buffers.
679    map: HashMap<u64, Box<[u8; N]>>,
680    /// Maximum number of buffers.
681    max_buf: usize,
682}
683
684impl<const N: usize> ReadBuffer<N> {
685    fn new(max_buf: usize) -> Self {
686        Self {
687            map: HashMap::default(),
688            max_buf,
689        }
690    }
691
692    fn reset(&mut self) {
693        self.map.clear();
694    }
695
696    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
697        let mut done = 0;
698        while done < data.len() {
699            let off = off + done as u64;
700            let sector = off / N as u64;
701            let disp = (off % N as u64) as usize;
702            let amount = min(data.len() - done, N - disp);
703
704            let p = self.map.entry(sector).or_insert_with(|| {
705                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
706                stg.read(sector * N as u64, &mut *p);
707                p
708            });
709            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
710            done += amount;
711        }
712        if self.map.len() >= self.max_buf {
713            self.reset();
714        }
715    }
716}
717
718#[derive(Default)]
719/// Slice of Data to be written to storage.
720struct DataSlice {
721    /// Slice data.
722    pub data: Data,
723    /// Start of slice.
724    pub off: usize,
725    /// Length of slice.
726    pub len: usize,
727}
728
729impl DataSlice {
730    /// Get reference to the whole slice.
731    pub fn all(&self) -> &[u8] {
732        &self.data[self.off..self.off + self.len]
733    }
734    /// Get reference to part of slice.
735    pub fn part(&self, off: usize, len: usize) -> &[u8] {
736        &self.data[self.off + off..self.off + off + len]
737    }
738    /// Trim specified amount from start of slice.
739    pub fn trim(&mut self, trim: usize) {
740        self.off += trim;
741        self.len -= trim;
742    }
743    /// Take the data.
744    #[allow(dead_code)]
745    pub fn take(&mut self) -> Data {
746        std::mem::take(&mut self.data)
747    }
748}
749
750#[derive(Default)]
751/// Updateable store based on some underlying storage.
752struct WMap {
753    /// Map of writes. Key is the end of the slice.
754    map: BTreeMap<u64, DataSlice>,
755}
756
757impl WMap {
758    /// Is the map empty?
759    pub fn is_empty(&self) -> bool {
760        self.map.is_empty()
761    }
762
763    /// Number of key-value pairs in the map.
764    pub fn len(&self) -> usize {
765        self.map.len()
766    }
767
768    /// Take the map and convert it to a Vec.
769    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
770        let map = std::mem::take(&mut self.map);
771        let mut result = Vec::with_capacity(map.len());
772        for (end, v) in map {
773            let start = end - v.len as u64;
774            result.push((start, v));
775        }
776        result
777    }
778
779    /// Write the map into storage.
780    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
781        for (end, v) in self.map.iter() {
782            let start = end - v.len as u64;
783            stg.write_data(start, v.data.clone(), v.off, v.len);
784        }
785    }
786
787    #[cfg(not(feature = "pstd"))]
788    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
789    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
790        if len != 0 {
791            let (mut insert, mut remove) = (Vec::new(), Vec::new());
792            let end = start + len as u64;
793            for (ee, v) in self.map.range_mut(start + 1..) {
794                let ee = *ee;
795                let es = ee - v.len as u64; // Existing write Start.
796                if es >= end {
797                    // Existing write starts after end of new write, nothing to do.
798                    break;
799                } else if start <= es {
800                    if end < ee {
801                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
802                        v.trim((end - es) as usize);
803                        break;
804                    }
805                    // New write subsumes existing write entirely, remove existing write.
806                    remove.push(ee);
807                } else if end < ee {
808                    // New write starts in middle of existing write, ends before end of existing write,
809                    // put start of existing write in insert list, trim existing write.
810                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
811                    v.trim((end - es) as usize);
812                    break;
813                } else {
814                    // New write starts in middle of existing write, ends after existing write,
815                    // put start of existing write in insert list, remove existing write.
816                    insert.push((es, v.take(), v.off, (start - es) as usize));
817                    remove.push(ee);
818                }
819            }
820            for end in remove {
821                self.map.remove(&end);
822            }
823            for (start, data, off, len) in insert {
824                self.map
825                    .insert(start + len as u64, DataSlice { data, off, len });
826            }
827            self.map
828                .insert(start + len as u64, DataSlice { data, off, len });
829        }
830    }
831
832    #[cfg(feature = "pstd")]
833    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
834    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
835        if len != 0 {
836            let end = start + len as u64;
837            let mut c = self
838                .map
839                .lower_bound_mut(std::ops::Bound::Excluded(&start))
840                .with_mutable_key();
841            while let Some((eend, v)) = c.next() {
842                let ee = *eend;
843                let es = ee - v.len as u64; // Existing write Start.
844                if es >= end {
845                    // Existing write starts after end of new write, nothing to do.
846                    c.prev();
847                    break;
848                } else if start <= es {
849                    if end < ee {
850                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
851                        v.trim((end - es) as usize);
852                        c.prev();
853                        break;
854                    }
855                    // New write subsumes existing write entirely, remove existing write.
856                    c.remove_prev();
857                } else if end < ee {
858                    // New write starts in middle of existing write, ends before end of existing write,
859                    // trim existing write, insert start of existing write.
860                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
861                    v.trim((end - es) as usize);
862                    c.prev();
863                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
864                    break;
865                } else {
866                    // New write starts in middle of existing write, ends after existing write,
867                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
868                    v.len = (start - es) as usize;
869                    *eend = es + v.len as u64;
870                }
871            }
872            // Insert the new write.
873            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
874        }
875    }
876
877    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
878    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
879        let len = data.len();
880        if len != 0 {
881            let mut done = 0;
882            for (&end, v) in self.map.range(start + 1..) {
883                let es = end - v.len as u64; // Existing write Start.
884                let doff = start + done as u64;
885                if es > doff {
886                    // Read from underlying storage.
887                    let a = min(len - done, (es - doff) as usize);
888                    u.read(doff, &mut data[done..done + a]);
889                    done += a;
890                    if done == len {
891                        return;
892                    }
893                }
894                // Use existing write.
895                let skip = (start + done as u64 - es) as usize;
896                let a = min(len - done, v.len - skip);
897                data[done..done + a].copy_from_slice(v.part(skip, a));
898                done += a;
899                if done == len {
900                    return;
901                }
902            }
903            u.read(start + done as u64, &mut data[done..]);
904        }
905    }
906}
907
908/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
909pub struct BasicAtomicFile {
910    /// The main underlying storage.
911    stg: WriteBuffer,
912    /// Temporary storage for updates during commit.
913    upd: WriteBuffer,
914    /// Map of writes.
915    map: WMap,
916    /// List of writes.
917    list: Vec<(u64, DataSlice)>,
918    size: u64,
919}
920
921impl BasicAtomicFile {
922    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
923    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
924        let size = stg.size();
925        let mut result = Box::new(Self {
926            stg: WriteBuffer::new(stg, lim.swbuf),
927            upd: WriteBuffer::new(upd, lim.uwbuf),
928            map: WMap::default(),
929            list: Vec::new(),
930            size,
931        });
932        result.init();
933        result
934    }
935
936    /// Apply outstanding updates.
937    fn init(&mut self) {
938        let end = self.upd.stg.read_u64(0);
939        let size = self.upd.stg.read_u64(8);
940        if end == 0 {
941            return;
942        }
943        assert!(end == self.upd.stg.size());
944        let mut pos = 16;
945        while pos < end {
946            let start = self.upd.stg.read_u64(pos);
947            pos += 8;
948            let len = self.upd.stg.read_u64(pos);
949            pos += 8;
950            let mut buf = vec![0; len as usize];
951            self.upd.stg.read(pos, &mut buf);
952            pos += len;
953            self.stg.write(start, &buf);
954        }
955        self.stg.commit(size);
956        self.upd.commit(0);
957    }
958
959    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
960    pub fn commit_phase(&mut self, size: u64, phase: u8) {
961        if self.map.is_empty() && self.list.is_empty() {
962            return;
963        }
964        if phase == 1 {
965            self.list = self.map.convert_to_vec();
966
967            // Write the updates to upd.
968            // First set the end position to zero.
969            self.upd.write_u64(0, 0);
970            self.upd.write_u64(8, size);
971            self.upd.commit(16); // Not clear if this is necessary.
972
973            // Write the update records.
974            let mut stg_written = false;
975            let mut pos: u64 = 16;
976            for (start, v) in self.list.iter() {
977                let (start, len, data) = (*start, v.len as u64, v.all());
978                if start >= self.size {
979                    // Writes beyond current stg size can be written directly.
980                    stg_written = true;
981                    self.stg.write(start, data);
982                } else {
983                    self.upd.write_u64(pos, start);
984                    pos += 8;
985                    self.upd.write_u64(pos, len);
986                    pos += 8;
987                    self.upd.write(pos, data);
988                    pos += len;
989                }
990            }
991            if stg_written {
992                self.stg.commit(size);
993            }
994            self.upd.commit(pos); // Not clear if this is necessary.
995
996            // Set the end position.
997            self.upd.write_u64(0, pos);
998            self.upd.write_u64(8, size);
999            self.upd.commit(pos);
1000        } else {
1001            for (start, v) in self.list.iter() {
1002                if *start < self.size {
1003                    // Writes beyond current stg size have already been written.
1004                    self.stg.write(*start, v.all());
1005                }
1006            }
1007            self.list.clear();
1008            self.stg.commit(size);
1009            self.upd.commit(0);
1010        }
1011    }
1012}
1013
1014impl BasicStorage for BasicAtomicFile {
1015    fn commit(&mut self, size: u64) {
1016        self.commit_phase(size, 1);
1017        self.commit_phase(size, 2);
1018        self.size = size;
1019    }
1020
1021    fn size(&self) -> u64 {
1022        self.size
1023    }
1024
1025    fn read(&self, start: u64, data: &mut [u8]) {
1026        self.map.read(start, data, &*self.stg.stg);
1027    }
1028
1029    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1030        self.map.write(start, data, off, len);
1031    }
1032
1033    fn write(&mut self, start: u64, data: &[u8]) {
1034        let len = data.len();
1035        let d = Arc::new(data.to_vec());
1036        self.write_data(start, d, 0, len);
1037    }
1038}
1039
1040/// Optimized implementation of [Storage] ( unix only ).
1041#[cfg(target_family = "unix")]
1042pub struct UnixFileStorage {
1043    size: Arc<Mutex<u64>>,
1044    f: fs::File,
1045}
1046#[cfg(target_family = "unix")]
1047impl UnixFileStorage {
1048    /// Construct from filename.
1049    pub fn new(filename: &str) -> Box<Self> {
1050        let mut f = OpenOptions::new()
1051            .read(true)
1052            .write(true)
1053            .create(true)
1054            .truncate(false)
1055            .open(filename)
1056            .unwrap();
1057        let size = f.seek(SeekFrom::End(0)).unwrap();
1058        let size = Arc::new(Mutex::new(size));
1059        Box::new(Self { size, f })
1060    }
1061}
1062
1063#[cfg(target_family = "unix")]
1064impl Storage for UnixFileStorage {
1065    fn clone(&self) -> Box<dyn Storage> {
1066        Box::new(Self {
1067            size: self.size.clone(),
1068            f: self.f.try_clone().unwrap(),
1069        })
1070    }
1071}
1072
1073#[cfg(target_family = "unix")]
1074use std::os::unix::fs::FileExt;
1075
1076#[cfg(target_family = "unix")]
1077impl BasicStorage for UnixFileStorage {
1078    fn read(&self, start: u64, data: &mut [u8]) {
1079        let _ = self.f.read_at(data, start);
1080    }
1081
1082    fn write(&mut self, start: u64, data: &[u8]) {
1083        let _ = self.f.write_at(data, start);
1084    }
1085
1086    fn size(&self) -> u64 {
1087        *self.size.lock().unwrap()
1088    }
1089
1090    fn commit(&mut self, size: u64) {
1091        *self.size.lock().unwrap() = size;
1092        self.f.set_len(size).unwrap();
1093        self.f.sync_all().unwrap();
1094    }
1095}
1096
1097/// Optimized implementation of [Storage] ( windows only ).
1098#[cfg(target_family = "windows")]
1099pub struct WindowsFileStorage {
1100    size: Arc<Mutex<u64>>,
1101    f: fs::File,
1102}
1103#[cfg(target_family = "windows")]
1104impl WindowsFileStorage {
1105    /// Construct from filename.
1106    pub fn new(filename: &str) -> Box<Self> {
1107        let mut f = OpenOptions::new()
1108            .read(true)
1109            .write(true)
1110            .create(true)
1111            .truncate(false)
1112            .open(filename)
1113            .unwrap();
1114        let size = f.seek(SeekFrom::End(0)).unwrap();
1115        let size = Arc::new(Mutex::new(size));
1116        Box::new(Self { size, f })
1117    }
1118}
1119
1120#[cfg(target_family = "windows")]
1121impl Storage for WindowsFileStorage {
1122    fn clone(&self) -> Box<dyn Storage> {
1123        Box::new(Self {
1124            size: self.size.clone(),
1125            f: self.f.try_clone().unwrap(),
1126        })
1127    }
1128}
1129
1130#[cfg(target_family = "windows")]
1131use std::os::windows::fs::FileExt;
1132
1133#[cfg(target_family = "windows")]
1134impl BasicStorage for WindowsFileStorage {
1135    fn read(&self, start: u64, data: &mut [u8]) {
1136        let _ = self.f.seek_read(data, start);
1137    }
1138
1139    fn write(&mut self, start: u64, data: &[u8]) {
1140        let _ = self.f.seek_write(data, start);
1141    }
1142
1143    fn size(&self) -> u64 {
1144        *self.size.lock().unwrap()
1145    }
1146
1147    fn commit(&mut self, size: u64) {
1148        *self.size.lock().unwrap() = size;
1149        self.f.set_len(size).unwrap();
1150        self.f.sync_all().unwrap();
1151    }
1152}
1153
1154/// Optimised Storage ( varies according to platform ).
1155#[cfg(target_family = "windows")]
1156pub type MultiFileStorage = WindowsFileStorage;
1157
1158/// Optimised Storage ( varies according to platform ).
1159#[cfg(target_family = "unix")]
1160pub type MultiFileStorage = UnixFileStorage;
1161
1162/// Optimised Storage ( varies according to platform ).
1163#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1164pub type MultiFileStorage = AnyFileStorage;
1165
1166/// Fast Storage for upd file ( varies according to platform ).
1167#[cfg(any(target_family = "windows", target_family = "unix"))]
1168pub type FastFileStorage = MultiFileStorage;
1169
1170/// Fast Storage for upd file ( varies according to platform ).
1171#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1172pub type FastFileStorage = UpdFileStorage;
1173
1174#[cfg(test)]
1175/// Get amount of testing from environment variable TA.
1176fn test_amount() -> usize {
1177    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1178}
1179
1180#[test]
1181fn test_atomic_file() {
1182    use rand::Rng;
1183    /* Idea of test is to check AtomicFile and MemFile behave the same */
1184
1185    let ta = test_amount();
1186    println!(" Test amount={}", ta);
1187
1188    let mut rng = rand::thread_rng();
1189
1190    for _ in 0..100 {
1191        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1192        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1193        let mut s2 = MemFile::new();
1194
1195        for _ in 0..1000 * ta {
1196            let off: usize = rng.r#gen::<usize>() % 100;
1197            let mut len = 1 + rng.r#gen::<usize>() % 20;
1198            let w: bool = rng.r#gen();
1199            if w {
1200                let mut bytes = Vec::new();
1201                while len > 0 {
1202                    len -= 1;
1203                    let b: u8 = rng.r#gen::<u8>();
1204                    bytes.push(b);
1205                }
1206                s1.write(off as u64, &bytes);
1207                s2.write(off as u64, &bytes);
1208            } else {
1209                let mut b2 = vec![0; len];
1210                let mut b3 = vec![0; len];
1211                s1.read(off as u64, &mut b2);
1212                s2.read(off as u64, &mut b3);
1213                assert!(b2 == b3);
1214            }
1215            if rng.r#gen::<usize>() % 50 == 0 {
1216                s1.commit(200);
1217                s2.commit(200);
1218            }
1219        }
1220    }
1221}