Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::GTemp,
27    veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39/// ```Arc<Vec<u8>>```
40pub type Data = Arc<Vec<u8>>;
41
42/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
43/// Performs commit asyncronously.
44///
45/// #Example
46///
47/// ```
48/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
49/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
50/// af.write( 0, &[1,2,3,4] );
51/// af.commit(4);
52/// af.wait_complete();
53/// ```
54///
55/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
56/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
57/// the updates to underlying storage have been applied.
58pub struct AtomicFile {
59    /// New updates are written here.
60    map: WMap,
61    /// Underlying file, with previous updates mapped.
62    cf: Arc<RwLock<CommitFile>>,
63    /// File size.
64    size: u64,
65    /// For sending update maps to be saved.
66    tx: std::sync::mpsc::Sender<(u64, WMap)>,
67    /// Held by update process while it is active.
68    busy: Arc<Mutex<()>>,
69    /// Limit on size of CommitFile map.
70    map_lim: usize,
71}
72
73impl AtomicFile {
74    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
75    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76        Self::new_with_limits(stg, upd, &Limits::default())
77    }
78
79    /// Construct Atomic file with specified limits.
80    pub fn new_with_limits(
81        stg: Box<dyn Storage>,
82        upd: Box<dyn BasicStorage>,
83        lim: &Limits,
84    ) -> Box<Self> {
85        let size = stg.size();
86        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
91
92        // Start the thread which does save asyncronously.
93        let (cf1, busy1) = (cf.clone(), busy.clone());
94
95        std::thread::spawn(move || {
96            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
97            while let Ok((size, map)) = rx.recv() {
98                let _lock = busy1.lock();
99                baf.map = map;
100                baf.commit(size);
101                cf1.write().unwrap().done_one();
102            }
103        });
104        Box::new(Self {
105            map: WMap::default(),
106            cf,
107            size,
108            tx,
109            busy,
110            map_lim: lim.map_lim,
111        })
112    }
113}
114
115impl Storage for AtomicFile {
116    fn clone(&self) -> Box<dyn Storage> {
117        panic!()
118    }
119}
120
121impl BasicStorage for AtomicFile {
122    fn commit(&mut self, size: u64) {
123        self.size = size;
124        if self.map.is_empty() {
125            return;
126        }
127        if self.cf.read().unwrap().map.len() > self.map_lim {
128            self.wait_complete();
129        }
130        let map = std::mem::take(&mut self.map);
131        let stop =
132        {
133            let cf = &mut *self.cf.write().unwrap();
134            if cf.stop { true }
135            else
136            {
137                cf.todo += 1;
138                // Apply map of updates to CommitFile.
139                map.to_storage(cf);
140                // Send map of updates to thread to be written to underlying storage.
141                self.tx.send((size, map)).unwrap();
142                false
143            }
144        };
145        while stop {
146            // Program is terminating, loop forever.
147            std::thread::sleep(std::time::Duration::from_millis(100));
148        }
149    }
150
151    fn size(&self) -> u64 {
152        self.size
153    }
154
155    fn read(&self, start: u64, data: &mut [u8]) {
156        self.map.read(start, data, &*self.cf.read().unwrap());
157    }
158
159    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160        self.map.write(start, data, off, len);
161    }
162
163    fn write(&mut self, start: u64, data: &[u8]) {
164        let len = data.len();
165        let d = Arc::new(data.to_vec());
166        self.write_data(start, d, 0, len);
167    }
168
169    fn wait_complete(&self) {
170       while self.cf.read().unwrap().todo != 0 {
171           let _x = self.busy.lock();
172       }
173    }   
174
175    fn shutdown(&mut self) {
176        self.cf.write().unwrap().stop = true; // Prevents new commits from being added.
177        self.wait_complete();
178    }       
179}
180
181struct CommitFile {
182    /// Buffered underlying storage.
183    stg: ReadBufStg<256>,
184    /// Map of committed updates.
185    map: WMap,
186    /// Number of outstanding unsaved commits.
187    todo: usize,
188    /// Flag to prevent new commits starting
189    stop: bool,
190}
191
192impl CommitFile {
193    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
194        Self {
195            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
196            map: WMap::default(),
197            todo: 0,
198            stop: false,
199        }
200    }
201
202    fn done_one(&mut self) {
203        self.todo -= 1;
204        if self.todo == 0 {
205            self.map = WMap::default();
206            self.stg.reset();
207        }
208    }
209}
210
211impl BasicStorage for CommitFile {
212    fn commit(&mut self, _size: u64) {
213        panic!()
214    }
215
216    fn size(&self) -> u64 {
217        panic!()
218    }
219
220    fn read(&self, start: u64, data: &mut [u8]) {
221        self.map.read(start, data, &self.stg);
222    }
223
224    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
225        self.map.write(start, data, off, len);
226    }
227
228    fn write(&mut self, _start: u64, _data: &[u8]) {
229        panic!()
230    }
231}
232
233/// Storage interface - BasicStorage is some kind of "file" storage.
234///
235/// read and write methods take a start which is a byte offset in the underlying file.
236pub trait BasicStorage: Send {
237    /// Get the size of the underlying storage.
238    /// Note : this is valid initially and after a commit but is not defined after write is called.
239    fn size(&self) -> u64;
240
241    /// Read data.
242    fn read(&self, start: u64, data: &mut [u8]);
243
244    /// Write byte slice to storage.
245    fn write(&mut self, start: u64, data: &[u8]);
246
247    /// Write byte Vec.
248    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
249        let len = data.len();
250        let d = Arc::new(data);
251        self.write_data(start, d, 0, len);
252    }
253
254    /// Write Data slice.
255    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
256        self.write(start, &data[off..off + len]);
257    }
258
259    /// Finish write transaction, size is new size of underlying storage.
260    fn commit(&mut self, size: u64);
261
262    /// Write u64.
263    fn write_u64(&mut self, start: u64, value: u64) {
264        self.write(start, &value.to_le_bytes());
265    }
266
267    /// Read u64.
268    fn read_u64(&self, start: u64) -> u64 {
269        let mut bytes = [0; 8];
270        self.read(start, &mut bytes);
271        u64::from_le_bytes(bytes)
272    }
273
274    /// Wait until current writes are complete.
275    fn wait_complete(&self){}
276
277    /// Called on program termination.
278    fn shutdown(&mut self){}
279}
280
281/// BasicStorage with Sync and clone.
282pub trait Storage: BasicStorage + Sync {
283    /// Clone.
284    fn clone(&self) -> Box<dyn Storage>;
285}
286
287/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
288#[derive(Default)]
289pub struct MemFile {
290    v: Arc<Mutex<Vec<u8>>>,
291}
292
293impl MemFile {
294    /// Get a new (boxed) MemFile.
295    pub fn new() -> Box<Self> {
296        Box::default()
297    }
298}
299
300impl Storage for MemFile {
301    fn clone(&self) -> Box<dyn Storage> {
302        Box::new(Self { v: self.v.clone() })
303    }
304}
305
306impl BasicStorage for MemFile {
307    fn size(&self) -> u64 {
308        let v = self.v.lock().unwrap();
309        v.len() as u64
310    }
311
312    fn read(&self, off: u64, bytes: &mut [u8]) {
313        let off = off as usize;
314        let len = bytes.len();
315        let mut v = self.v.lock().unwrap();
316        if off + len > v.len() {
317            v.resize(off + len, 0);
318        }
319        bytes.copy_from_slice(&v[off..off + len]);
320    }
321
322    fn write(&mut self, off: u64, bytes: &[u8]) {
323        let off = off as usize;
324        let len = bytes.len();
325        let mut v = self.v.lock().unwrap();
326        if off + len > v.len() {
327            v.resize(off + len, 0);
328        }
329        v[off..off + len].copy_from_slice(bytes);
330    }
331
332    fn commit(&mut self, size: u64) {
333        let mut v = self.v.lock().unwrap();
334        v.resize(size as usize, 0);
335    }
336}
337
338use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
339
340struct FileInner {
341    f: fs::File,
342}
343
344impl FileInner {
345    /// Construct from filename.
346    pub fn new(filename: &str) -> Self {
347        Self {
348            f: OpenOptions::new()
349                .read(true)
350                .write(true)
351                .create(true)
352                .truncate(false)
353                .open(filename)
354                .unwrap(),
355        }
356    }
357
358    fn size(&mut self) -> u64 {
359        self.f.seek(SeekFrom::End(0)).unwrap()
360    }
361
362    fn read(&mut self, off: u64, bytes: &mut [u8]) {
363        self.f.seek(SeekFrom::Start(off)).unwrap();
364        let _ = self.f.read(bytes).unwrap();
365    }
366
367    fn write(&mut self, off: u64, bytes: &[u8]) {
368        // The list of operating systems which auto-zero is likely more than this...research is todo.
369        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
370        {
371            let size = self.f.seek(SeekFrom::End(0)).unwrap();
372            if off > size {
373                self.f.set_len(off).unwrap();
374            }
375        }
376        self.f.seek(SeekFrom::Start(off)).unwrap();
377        let _ = self.f.write(bytes).unwrap();
378    }
379
380    fn commit(&mut self, size: u64) {
381        self.f.set_len(size).unwrap();
382        self.f.sync_all().unwrap();
383    }
384}
385
386/// For atomic upd file, if not unix or windows.
387pub struct UpdFileStorage {
388    file: Cell<Option<FileInner>>,
389}
390
391impl UpdFileStorage {
392    /// Construct from filename.
393    pub fn new(filename: &str) -> Box<Self> {
394        Box::new(Self {
395            file: Cell::new(Some(FileInner::new(filename))),
396        })
397    }
398}
399
400impl BasicStorage for UpdFileStorage {
401    fn size(&self) -> u64 {
402        let mut f = self.file.take().unwrap();
403        let result = f.size();
404        self.file.set(Some(f));
405        result
406    }
407    fn read(&self, off: u64, bytes: &mut [u8]) {
408        let mut f = self.file.take().unwrap();
409        f.read(off, bytes);
410        self.file.set(Some(f));
411    }
412
413    fn write(&mut self, off: u64, bytes: &[u8]) {
414        let mut f = self.file.take().unwrap();
415        f.write(off, bytes);
416        self.file.set(Some(f));
417    }
418
419    fn commit(&mut self, size: u64) {
420        let mut f = self.file.take().unwrap();
421        f.commit(size);
422        self.file.set(Some(f));
423    }
424}
425
426/// Simple implementation of [Storage] using [`std::fs::File`].
427pub struct SimpleFileStorage {
428    file: Arc<Mutex<FileInner>>,
429}
430
431impl SimpleFileStorage {
432    /// Construct from filename.
433    pub fn new(filename: &str) -> Box<Self> {
434        Box::new(Self {
435            file: Arc::new(Mutex::new(FileInner::new(filename))),
436        })
437    }
438}
439
440impl Storage for SimpleFileStorage {
441    fn clone(&self) -> Box<dyn Storage> {
442        Box::new(Self {
443            file: self.file.clone(),
444        })
445    }
446}
447
448impl BasicStorage for SimpleFileStorage {
449    fn size(&self) -> u64 {
450        self.file.lock().unwrap().size()
451    }
452
453    fn read(&self, off: u64, bytes: &mut [u8]) {
454        self.file.lock().unwrap().read(off, bytes);
455    }
456
457    fn write(&mut self, off: u64, bytes: &[u8]) {
458        self.file.lock().unwrap().write(off, bytes);
459    }
460
461    fn commit(&mut self, size: u64) {
462        self.file.lock().unwrap().commit(size);
463    }
464}
465
466/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
467pub struct AnyFileStorage {
468    filename: String,
469    files: Arc<Mutex<Vec<FileInner>>>,
470}
471
472impl AnyFileStorage {
473    /// Create new.
474    pub fn new(filename: &str) -> Box<Self> {
475        Box::new(Self {
476            filename: filename.to_owned(),
477            files: Arc::new(Mutex::new(Vec::new())),
478        })
479    }
480
481    fn get_file(&self) -> FileInner {
482        match self.files.lock().unwrap().pop() {
483            Some(f) => f,
484            _ => FileInner::new(&self.filename),
485        }
486    }
487
488    fn put_file(&self, f: FileInner) {
489        self.files.lock().unwrap().push(f);
490    }
491}
492
493impl Storage for AnyFileStorage {
494    fn clone(&self) -> Box<dyn Storage> {
495        Box::new(Self {
496            filename: self.filename.clone(),
497            files: self.files.clone(),
498        })
499    }
500}
501
502impl BasicStorage for AnyFileStorage {
503    fn size(&self) -> u64 {
504        let mut f = self.get_file();
505        let result = f.size();
506        self.put_file(f);
507        result
508    }
509
510    fn read(&self, off: u64, bytes: &mut [u8]) {
511        let mut f = self.get_file();
512        f.read(off, bytes);
513        self.put_file(f);
514    }
515
516    fn write(&mut self, off: u64, bytes: &[u8]) {
517        let mut f = self.get_file();
518        f.write(off, bytes);
519        self.put_file(f);
520    }
521
522    fn commit(&mut self, size: u64) {
523        let mut f = self.get_file();
524        f.commit(size);
525        self.put_file(f);
526    }
527}
528
529/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
530pub struct DummyFile {}
531impl DummyFile {
532    /// Construct.
533    pub fn new() -> Box<Self> {
534        Box::new(Self {})
535    }
536}
537
538impl Storage for DummyFile {
539    fn clone(&self) -> Box<dyn Storage> {
540        Self::new()
541    }
542}
543
544impl BasicStorage for DummyFile {
545    fn size(&self) -> u64 {
546        0
547    }
548
549    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
550
551    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
552
553    fn commit(&mut self, _size: u64) {}
554}
555
556/// Memory configuration limits for [`AtomicFile`].
557#[non_exhaustive]
558pub struct Limits {
559    /// Limit on size of commit write map, default is 5000.
560    pub map_lim: usize,
561    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
562    pub rbuf_mem: usize,
563    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
564    pub swbuf: usize,
565    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
566    pub uwbuf: usize,
567}
568
569impl Default for Limits {
570    fn default() -> Self {
571        Self {
572            map_lim: 5000,
573            rbuf_mem: 0x200000,
574            swbuf: 0x100000,
575            uwbuf: 0x100000,
576        }
577    }
578}
579
580/// Write Buffer.
581struct WriteBuffer {
582    /// Current write index into buf.
583    ix: usize,
584    /// Current file position.
585    pos: u64,
586    /// Underlying storage.
587    pub stg: Box<dyn BasicStorage>,
588    /// Buffer.
589    buf: Vec<u8>,
590}
591
592impl WriteBuffer {
593    /// Construct.
594    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
595        Self {
596            ix: 0,
597            pos: u64::MAX,
598            stg,
599            buf: vec![0; buf_size],
600        }
601    }
602
603    /// Write data to specified offset,
604    pub fn write(&mut self, off: u64, data: &[u8]) {
605        if self.pos + self.ix as u64 != off {
606            self.flush(off);
607        }
608        let mut done: usize = 0;
609        let mut todo: usize = data.len();
610        while todo > 0 {
611            let mut n: usize = self.buf.len() - self.ix;
612            if n == 0 {
613                self.flush(off + done as u64);
614                n = self.buf.len();
615            }
616            if n > todo {
617                n = todo;
618            }
619            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
620            todo -= n;
621            done += n;
622            self.ix += n;
623        }
624    }
625
626    fn flush(&mut self, new_pos: u64) {
627        if self.ix > 0 {
628            self.stg.write(self.pos, &self.buf[0..self.ix]);
629        }
630        self.ix = 0;
631        self.pos = new_pos;
632    }
633
634    /// Commit.
635    pub fn commit(&mut self, size: u64) {
636        self.flush(u64::MAX);
637        self.stg.commit(size);
638    }
639
640    /// Write u64.
641    pub fn write_u64(&mut self, start: u64, value: u64) {
642        self.write(start, &value.to_le_bytes());
643    }
644}
645
646/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
647///
648/// See implementation of AtomicFile for how this is used in conjunction with WMap.
649///
650/// N is buffer size.
651struct ReadBufStg<const N: usize> {
652    /// Underlying storage.
653    stg: Box<dyn Storage>,
654    /// Buffers.
655    buf: Mutex<ReadBuffer<N>>,
656    /// Read size that is considered small.
657    limit: usize,
658}
659
660impl<const N: usize> Drop for ReadBufStg<N> {
661    fn drop(&mut self) {
662        self.reset();
663    }
664}
665
666impl<const N: usize> ReadBufStg<N> {
667    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
668    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
669        Self {
670            stg,
671            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
672            limit,
673        }
674    }
675
676    /// Clears the buffers.
677    fn reset(&mut self) {
678        self.buf.lock().unwrap().reset();
679    }
680}
681
682impl<const N: usize> BasicStorage for ReadBufStg<N> {
683    /// Read data from storage.
684    fn read(&self, start: u64, data: &mut [u8]) {
685        if data.len() <= self.limit {
686            self.buf.lock().unwrap().read(&*self.stg, start, data);
687        } else {
688            self.stg.read(start, data);
689        }
690    }
691
692    /// Panics.
693    fn size(&self) -> u64 {
694        panic!()
695    }
696
697    /// Panics.
698    fn write(&mut self, _start: u64, _data: &[u8]) {
699        panic!();
700    }
701
702    /// Panics.
703    fn commit(&mut self, _size: u64) {
704        panic!();
705    }
706}
707
708struct ReadBuffer<const N: usize> {
709    /// Maps sector mumbers cached buffers.
710    map: HashMap<u64, Box<[u8; N]>>,
711    /// Maximum number of buffers.
712    max_buf: usize,
713}
714
715impl<const N: usize> ReadBuffer<N> {
716    fn new(max_buf: usize) -> Self {
717        Self {
718            map: HashMap::default(),
719            max_buf,
720        }
721    }
722
723    fn reset(&mut self) {
724        self.map.clear();
725    }
726
727    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
728        let mut done = 0;
729        while done < data.len() {
730            let off = off + done as u64;
731            let sector = off / N as u64;
732            let disp = (off % N as u64) as usize;
733            let amount = min(data.len() - done, N - disp);
734
735            let p = self.map.entry(sector).or_insert_with(|| {
736                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
737                stg.read(sector * N as u64, &mut *p);
738                p
739            });
740            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
741            done += amount;
742        }
743        if self.map.len() >= self.max_buf {
744            self.reset();
745        }
746    }
747}
748
749#[derive(Default)]
750/// Slice of Data to be written to storage.
751struct DataSlice {
752    /// Slice data.
753    pub data: Data,
754    /// Start of slice.
755    pub off: usize,
756    /// Length of slice.
757    pub len: usize,
758}
759
760impl DataSlice {
761    /// Get reference to the whole slice.
762    pub fn all(&self) -> &[u8] {
763        &self.data[self.off..self.off + self.len]
764    }
765    /// Get reference to part of slice.
766    pub fn part(&self, off: usize, len: usize) -> &[u8] {
767        &self.data[self.off + off..self.off + off + len]
768    }
769    /// Trim specified amount from start of slice.
770    pub fn trim(&mut self, trim: usize) {
771        self.off += trim;
772        self.len -= trim;
773    }
774    /// Take the data.
775    #[allow(dead_code)]
776    pub fn take(&mut self) -> Data {
777        std::mem::take(&mut self.data)
778    }
779}
780
781#[derive(Default)]
782/// Updateable store based on some underlying storage.
783struct WMap {
784    /// Map of writes. Key is the end of the slice.
785    map: BTreeMap<u64, DataSlice>,
786}
787
788impl WMap {
789    /// Is the map empty?
790    pub fn is_empty(&self) -> bool {
791        self.map.is_empty()
792    }
793
794    /// Number of key-value pairs in the map.
795    pub fn len(&self) -> usize {
796        self.map.len()
797    }
798
799    /// Take the map and convert it to a Vec.
800    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
801        let map = std::mem::take(&mut self.map);
802        let mut result = GVec::with_capacity(map.len());
803        for (end, v) in map {
804            let start = end - v.len as u64;
805            result.push((start, v));
806        }
807        result
808    }
809
810    /// Write the map into storage.
811    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
812        for (end, v) in self.map.iter() {
813            let start = end - v.len as u64;
814            stg.write_data(start, v.data.clone(), v.off, v.len);
815        }
816    }
817
818    #[cfg(not(feature = "pstd"))]
819    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
820    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
821        if len != 0 {
822            let (mut insert, mut remove) = (Vec::new(), Vec::new());
823            let end = start + len as u64;
824            for (ee, v) in self.map.range_mut(start + 1..) {
825                let ee = *ee;
826                let es = ee - v.len as u64; // Existing write Start.
827                if es >= end {
828                    // Existing write starts after end of new write, nothing to do.
829                    break;
830                } else if start <= es {
831                    if end < ee {
832                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
833                        v.trim((end - es) as usize);
834                        break;
835                    }
836                    // New write subsumes existing write entirely, remove existing write.
837                    remove.push(ee);
838                } else if end < ee {
839                    // New write starts in middle of existing write, ends before end of existing write,
840                    // put start of existing write in insert list, trim existing write.
841                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
842                    v.trim((end - es) as usize);
843                    break;
844                } else {
845                    // New write starts in middle of existing write, ends after existing write,
846                    // put start of existing write in insert list, remove existing write.
847                    insert.push((es, v.take(), v.off, (start - es) as usize));
848                    remove.push(ee);
849                }
850            }
851            for end in remove {
852                self.map.remove(&end);
853            }
854            for (start, data, off, len) in insert {
855                self.map
856                    .insert(start + len as u64, DataSlice { data, off, len });
857            }
858            self.map
859                .insert(start + len as u64, DataSlice { data, off, len });
860        }
861    }
862
863    #[cfg(feature = "pstd")]
864    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
865    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
866        if len != 0 {
867            let end = start + len as u64;
868            let mut c = self
869                .map
870                .lower_bound_mut(std::ops::Bound::Excluded(&start))
871                .with_mutable_key();
872            while let Some((eend, v)) = c.next() {
873                let ee = *eend;
874                let es = ee - v.len as u64; // Existing write Start.
875                if es >= end {
876                    // Existing write starts after end of new write, nothing to do.
877                    c.prev();
878                    break;
879                } else if start <= es {
880                    if end < ee {
881                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
882                        v.trim((end - es) as usize);
883                        c.prev();
884                        break;
885                    }
886                    // New write subsumes existing write entirely, remove existing write.
887                    c.remove_prev();
888                } else if end < ee {
889                    // New write starts in middle of existing write, ends before end of existing write,
890                    // trim existing write, insert start of existing write.
891                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
892                    v.trim((end - es) as usize);
893                    c.prev();
894                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
895                    break;
896                } else {
897                    // New write starts in middle of existing write, ends after existing write,
898                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
899                    v.len = (start - es) as usize;
900                    *eend = es + v.len as u64;
901                }
902            }
903            // Insert the new write.
904            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
905        }
906    }
907
908    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
909    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
910        let len = data.len();
911        if len != 0 {
912            let mut done = 0;
913            for (&end, v) in self.map.range(start + 1..) {
914                let es = end - v.len as u64; // Existing write Start.
915                let doff = start + done as u64;
916                if es > doff {
917                    // Read from underlying storage.
918                    let a = min(len - done, (es - doff) as usize);
919                    u.read(doff, &mut data[done..done + a]);
920                    done += a;
921                    if done == len {
922                        return;
923                    }
924                }
925                // Use existing write.
926                let skip = (start + done as u64 - es) as usize;
927                let a = min(len - done, v.len - skip);
928                data[done..done + a].copy_from_slice(v.part(skip, a));
929                done += a;
930                if done == len {
931                    return;
932                }
933            }
934            u.read(start + done as u64, &mut data[done..]);
935        }
936    }
937}
938
939/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
940pub struct BasicAtomicFile {
941    /// The main underlying storage.
942    stg: WriteBuffer,
943    /// Temporary storage for updates during commit.
944    upd: WriteBuffer,
945    /// Map of writes.
946    map: WMap,
947    /// List of writes.
948    list: GVec<(u64, DataSlice)>,
949    size: u64,
950    stop: bool,
951}
952
953impl BasicAtomicFile {
954    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
955    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
956        let size = stg.size();
957        let mut result = Box::new(Self {
958            stg: WriteBuffer::new(stg, lim.swbuf),
959            upd: WriteBuffer::new(upd, lim.uwbuf),
960            map: WMap::default(),
961            list: GVec::new(),
962            size,
963            stop: false,
964        });
965        result.init();
966        result
967    }
968
969    /// Apply outstanding updates.
970    fn init(&mut self) {
971        let end = self.upd.stg.read_u64(0);
972        let size = self.upd.stg.read_u64(8);
973        if end == 0 {
974            return;
975        }
976        assert!(end == self.upd.stg.size());
977        let mut pos = 16;
978        while pos < end {
979            let start = self.upd.stg.read_u64(pos);
980            pos += 8;
981            let len = self.upd.stg.read_u64(pos);
982            pos += 8;
983            let mut buf: GVec<u8> = gvec![0; len as usize];
984            self.upd.stg.read(pos, &mut buf);
985            pos += len;
986            self.stg.write(start, &buf);
987        }
988        self.stg.commit(size);
989        self.upd.commit(0);
990    }
991
992    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
993    pub fn commit_phase(&mut self, size: u64, phase: u8) {
994        if self.map.is_empty() && self.list.is_empty() {
995            return;
996        }
997        if phase == 1 {
998            self.list = self.map.convert_to_vec();
999
1000            // Write the updates to upd.
1001            // First set the end position to zero.
1002            self.upd.write_u64(0, 0);
1003            self.upd.write_u64(8, size);
1004            self.upd.commit(16); // Not clear if this is necessary.
1005
1006            // Write the update records.
1007            let mut stg_written = false;
1008            let mut pos: u64 = 16;
1009            for (start, v) in self.list.iter() {
1010                let (start, len, data) = (*start, v.len as u64, v.all());
1011                if start >= self.size {
1012                    // Writes beyond current stg size can be written directly.
1013                    stg_written = true;
1014                    self.stg.write(start, data);
1015                } else {
1016                    self.upd.write_u64(pos, start);
1017                    pos += 8;
1018                    self.upd.write_u64(pos, len);
1019                    pos += 8;
1020                    self.upd.write(pos, data);
1021                    pos += len;
1022                }
1023            }
1024            if stg_written {
1025                self.stg.commit(size);
1026            }
1027            self.upd.commit(pos); // Not clear if this is necessary.
1028
1029            // Set the end position.
1030            self.upd.write_u64(0, pos);
1031            self.upd.write_u64(8, size);
1032            self.upd.commit(pos);
1033        } else {
1034            for (start, v) in self.list.iter() {
1035                if *start < self.size {
1036                    // Writes beyond current stg size have already been written.
1037                    self.stg.write(*start, v.all());
1038                }
1039            }
1040            self.list = GVec::new();
1041            self.stg.commit(size);
1042            self.upd.commit(0);
1043        }
1044    }
1045}
1046
1047impl BasicStorage for BasicAtomicFile {
1048    fn commit(&mut self, size: u64) {
1049        if self.stop { return; }
1050        self.commit_phase(size, 1);
1051        self.commit_phase(size, 2);
1052        self.size = size;
1053    }
1054
1055    fn size(&self) -> u64 {
1056        self.size
1057    }
1058
1059    fn read(&self, start: u64, data: &mut [u8]) {
1060        self.map.read(start, data, &*self.stg.stg);
1061    }
1062
1063    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1064        self.map.write(start, data, off, len);
1065    }
1066
1067    fn write(&mut self, start: u64, data: &[u8]) {
1068        let len = data.len();
1069        let d = Arc::new(data.to_vec());
1070        self.write_data(start, d, 0, len);
1071    }
1072
1073    fn shutdown(&mut self)
1074    {
1075        self.stop = true;
1076    }
1077}
1078
1079/// Optimized implementation of [Storage] ( unix only ).
1080#[cfg(target_family = "unix")]
1081pub struct UnixFileStorage {
1082    size: Arc<Mutex<u64>>,
1083    f: fs::File,
1084}
1085#[cfg(target_family = "unix")]
1086impl UnixFileStorage {
1087    /// Construct from filename.
1088    pub fn new(filename: &str) -> Box<Self> {
1089        let mut f = OpenOptions::new()
1090            .read(true)
1091            .write(true)
1092            .create(true)
1093            .truncate(false)
1094            .open(filename)
1095            .unwrap();
1096        let size = f.seek(SeekFrom::End(0)).unwrap();
1097        let size = Arc::new(Mutex::new(size));
1098        Box::new(Self { size, f })
1099    }
1100}
1101
1102#[cfg(target_family = "unix")]
1103impl Storage for UnixFileStorage {
1104    fn clone(&self) -> Box<dyn Storage> {
1105        Box::new(Self {
1106            size: self.size.clone(),
1107            f: self.f.try_clone().unwrap(),
1108        })
1109    }
1110}
1111
1112#[cfg(target_family = "unix")]
1113use std::os::unix::fs::FileExt;
1114
1115#[cfg(target_family = "unix")]
1116impl BasicStorage for UnixFileStorage {
1117    fn read(&self, start: u64, data: &mut [u8]) {
1118        let _ = self.f.read_at(data, start);
1119    }
1120
1121    fn write(&mut self, start: u64, data: &[u8]) {
1122        let _ = self.f.write_at(data, start);
1123    }
1124
1125    fn size(&self) -> u64 {
1126        *self.size.lock().unwrap()
1127    }
1128
1129    fn commit(&mut self, size: u64) {
1130        *self.size.lock().unwrap() = size;
1131        self.f.set_len(size).unwrap();
1132        self.f.sync_all().unwrap();
1133    }
1134}
1135
1136/// Optimized implementation of [Storage] ( windows only ).
1137#[cfg(target_family = "windows")]
1138pub struct WindowsFileStorage {
1139    size: Arc<Mutex<u64>>,
1140    f: fs::File,
1141}
1142#[cfg(target_family = "windows")]
1143impl WindowsFileStorage {
1144    /// Construct from filename.
1145    pub fn new(filename: &str) -> Box<Self> {
1146        let mut f = OpenOptions::new()
1147            .read(true)
1148            .write(true)
1149            .create(true)
1150            .truncate(false)
1151            .open(filename)
1152            .unwrap();
1153        let size = f.seek(SeekFrom::End(0)).unwrap();
1154        let size = Arc::new(Mutex::new(size));
1155        Box::new(Self { size, f })
1156    }
1157}
1158
1159#[cfg(target_family = "windows")]
1160impl Storage for WindowsFileStorage {
1161    fn clone(&self) -> Box<dyn Storage> {
1162        Box::new(Self {
1163            size: self.size.clone(),
1164            f: self.f.try_clone().unwrap(),
1165        })
1166    }
1167}
1168
1169#[cfg(target_family = "windows")]
1170use std::os::windows::fs::FileExt;
1171
1172#[cfg(target_family = "windows")]
1173impl BasicStorage for WindowsFileStorage {
1174    fn read(&self, start: u64, data: &mut [u8]) {
1175        let _ = self.f.seek_read(data, start);
1176    }
1177
1178    fn write(&mut self, start: u64, data: &[u8]) {
1179        let _ = self.f.seek_write(data, start);
1180    }
1181
1182    fn size(&self) -> u64 {
1183        *self.size.lock().unwrap()
1184    }
1185
1186    fn commit(&mut self, size: u64) {
1187        *self.size.lock().unwrap() = size;
1188        self.f.set_len(size).unwrap();
1189        self.f.sync_all().unwrap();
1190    }
1191}
1192
1193/// Optimised Storage ( varies according to platform ).
1194#[cfg(target_family = "windows")]
1195pub type MultiFileStorage = WindowsFileStorage;
1196
1197/// Optimised Storage ( varies according to platform ).
1198#[cfg(target_family = "unix")]
1199pub type MultiFileStorage = UnixFileStorage;
1200
1201/// Optimised Storage ( varies according to platform ).
1202#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1203pub type MultiFileStorage = AnyFileStorage;
1204
1205/// Fast Storage for upd file ( varies according to platform ).
1206#[cfg(any(target_family = "windows", target_family = "unix"))]
1207pub type FastFileStorage = MultiFileStorage;
1208
1209/// Fast Storage for upd file ( varies according to platform ).
1210#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1211pub type FastFileStorage = UpdFileStorage;
1212
1213#[cfg(test)]
1214/// Get amount of testing from environment variable TA.
1215fn test_amount() -> usize {
1216    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1217}
1218
1219#[test]
1220fn test_atomic_file() {
1221    use rand::Rng;
1222    /* Idea of test is to check AtomicFile and MemFile behave the same */
1223
1224    let ta = test_amount();
1225    println!(" Test amount={}", ta);
1226
1227    let mut rng = rand::thread_rng();
1228
1229    for _ in 0..100 {
1230        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1231        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1232        let mut s2 = MemFile::new();
1233
1234        for _ in 0..1000 * ta {
1235            let off: usize = rng.r#gen::<usize>() % 100;
1236            let mut len = 1 + rng.r#gen::<usize>() % 20;
1237            let w: bool = rng.r#gen();
1238            if w {
1239                let mut bytes = Vec::new();
1240                while len > 0 {
1241                    len -= 1;
1242                    let b: u8 = rng.r#gen::<u8>();
1243                    bytes.push(b);
1244                }
1245                s1.write(off as u64, &bytes);
1246                s2.write(off as u64, &bytes);
1247            } else {
1248                let mut b2 = vec![0; len];
1249                let mut b3 = vec![0; len];
1250                s1.read(off as u64, &mut b2);
1251                s2.read(off as u64, &mut b3);
1252                assert!(b2 == b3);
1253            }
1254            if rng.r#gen::<usize>() % 50 == 0 {
1255                s1.commit(200);
1256                s2.commit(200);
1257            }
1258        }
1259    }
1260}