Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24    VecA,
25    collections::{BTreeMapA, btree_map::CustomTuning},
26    localalloc::GTemp,
27    veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39/// ```Arc<Vec<u8>>```
40pub type Data = Arc<Vec<u8>>;
41
42/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
43/// Performs commit asyncronously.
44///
45/// #Example
46///
47/// ```
48/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
49/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
50/// af.write( 0, &[1,2,3,4] );
51/// af.commit(4);
52/// af.wait_complete();
53/// ```
54///
55/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
56/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
57/// the updates to underlying storage have been applied.
58pub struct AtomicFile {
59    /// New updates are written here.
60    map: WMap,
61    /// Underlying file, with previous updates mapped.
62    cf: Arc<RwLock<CommitFile>>,
63    /// File size.
64    size: u64,
65    /// For sending update maps to be saved.
66    tx: std::sync::mpsc::Sender<(u64, WMap)>,
67    /// Held by update process while it is active.
68    busy: Arc<Mutex<()>>,
69    /// Limit on size of CommitFile map.
70    map_lim: usize,
71}
72
73impl AtomicFile {
74    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
75    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76        Self::new_with_limits(stg, upd, &Limits::default())
77    }
78
79    /// Construct Atomic file with specified limits.
80    pub fn new_with_limits(
81        stg: Box<dyn Storage>,
82        upd: Box<dyn BasicStorage>,
83        lim: &Limits,
84    ) -> Box<Self> {
85        let size = stg.size();
86        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
91
92        // Start the thread which does save asyncronously.
93        let (cf1, busy1) = (cf.clone(), busy.clone());
94
95        std::thread::spawn(move || {
96            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
97            while let Ok((size, map)) = rx.recv() {
98                let _lock = busy1.lock();
99                baf.map = map;
100                baf.commit(size);
101                cf1.write().unwrap().done_one();
102            }
103        });
104        Box::new(Self {
105            map: WMap::default(),
106            cf,
107            size,
108            tx,
109            busy,
110            map_lim: lim.map_lim,
111        })
112    }
113}
114
115impl Storage for AtomicFile {
116    fn clone(&self) -> Box<dyn Storage> {
117        panic!()
118    }
119}
120
121impl BasicStorage for AtomicFile {
122    fn commit(&mut self, size: u64) {
123        self.size = size;
124        if self.map.is_empty() {
125            return;
126        }
127        if self.cf.read().unwrap().map.len() > self.map_lim {
128            self.wait_complete();
129        }
130        let map = std::mem::take(&mut self.map);
131        let stop =
132        {
133            let cf = &mut *self.cf.write().unwrap();
134            if cf.stop { true }
135            else
136            {
137                cf.todo += 1;
138                // Apply map of updates to CommitFile.
139                map.to_storage(cf);
140                // Send map of updates to thread to be written to underlying storage.
141                self.tx.send((size, map)).unwrap();
142                false
143            }
144        };
145        while stop {
146            // Program is terminating, loop forever.
147            std::thread::sleep(std::time::Duration::from_millis(100));
148        }
149    }
150
151    fn size(&self) -> u64 {
152        self.size
153    }
154
155    fn read(&self, start: u64, data: &mut [u8]) {
156        self.map.read(start, data, &*self.cf.read().unwrap());
157    }
158
159    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160        self.map.write(start, data, off, len);
161    }
162
163    fn write(&mut self, start: u64, data: &[u8]) {
164        let len = data.len();
165        let d = Arc::new(data.to_vec());
166        self.write_data(start, d, 0, len);
167    }
168
169    fn wait_complete(&self) {
170       while self.cf.read().unwrap().busy()
171       {
172           let _x = self.busy.lock();
173       }
174    }   
175
176    fn shutdown(&mut self) {
177        self.cf.write().unwrap().stop = true; // Prevents new commits from being added.
178        while self.cf.read().unwrap().todo != 0 {
179           let _x = self.busy.lock();
180       }
181    }       
182}
183
184struct CommitFile {
185    /// Buffered underlying storage.
186    stg: ReadBufStg<256>,
187    /// Map of committed updates.
188    map: WMap,
189    /// Number of outstanding unsaved commits.
190    todo: usize,
191    /// Flag to prevent new commits starting
192    stop: bool,
193}
194
195impl CommitFile {
196    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
197        Self {
198            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
199            map: WMap::default(),
200            todo: 0,
201            stop: false,
202        }
203    }
204
205    fn done_one(&mut self) {
206        self.todo -= 1;
207        if self.todo == 0 {
208            self.map = WMap::default();
209            self.stg.reset();
210        }
211    }
212
213    fn busy(&self) -> bool
214    {
215        self.todo != 0 || self.stop
216    }
217}
218
219impl BasicStorage for CommitFile {
220    fn commit(&mut self, _size: u64) {
221        panic!()
222    }
223
224    fn size(&self) -> u64 {
225        panic!()
226    }
227
228    fn read(&self, start: u64, data: &mut [u8]) {
229        self.map.read(start, data, &self.stg);
230    }
231
232    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
233        self.map.write(start, data, off, len);
234    }
235
236    fn write(&mut self, _start: u64, _data: &[u8]) {
237        panic!()
238    }
239}
240
241/// Storage interface - BasicStorage is some kind of "file" storage.
242///
243/// read and write methods take a start which is a byte offset in the underlying file.
244pub trait BasicStorage: Send {
245    /// Get the size of the underlying storage.
246    /// Note : this is valid initially and after a commit but is not defined after write is called.
247    fn size(&self) -> u64;
248
249    /// Read data.
250    fn read(&self, start: u64, data: &mut [u8]);
251
252    /// Write byte slice to storage.
253    fn write(&mut self, start: u64, data: &[u8]);
254
255    /// Write byte Vec.
256    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
257        let len = data.len();
258        let d = Arc::new(data);
259        self.write_data(start, d, 0, len);
260    }
261
262    /// Write Data slice.
263    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
264        self.write(start, &data[off..off + len]);
265    }
266
267    /// Finish write transaction, size is new size of underlying storage.
268    fn commit(&mut self, size: u64);
269
270    /// Write u64.
271    fn write_u64(&mut self, start: u64, value: u64) {
272        self.write(start, &value.to_le_bytes());
273    }
274
275    /// Read u64.
276    fn read_u64(&self, start: u64) -> u64 {
277        let mut bytes = [0; 8];
278        self.read(start, &mut bytes);
279        u64::from_le_bytes(bytes)
280    }
281
282    /// Wait until current writes are complete.
283    fn wait_complete(&self){}
284
285    /// Called on program termination.
286    fn shutdown(&mut self){}
287}
288
289/// BasicStorage with Sync and clone.
290pub trait Storage: BasicStorage + Sync {
291    /// Clone.
292    fn clone(&self) -> Box<dyn Storage>;
293}
294
295/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
296#[derive(Default)]
297pub struct MemFile {
298    v: Arc<Mutex<Vec<u8>>>,
299}
300
301impl MemFile {
302    /// Get a new (boxed) MemFile.
303    pub fn new() -> Box<Self> {
304        Box::default()
305    }
306}
307
308impl Storage for MemFile {
309    fn clone(&self) -> Box<dyn Storage> {
310        Box::new(Self { v: self.v.clone() })
311    }
312}
313
314impl BasicStorage for MemFile {
315    fn size(&self) -> u64 {
316        let v = self.v.lock().unwrap();
317        v.len() as u64
318    }
319
320    fn read(&self, off: u64, bytes: &mut [u8]) {
321        let off = off as usize;
322        let len = bytes.len();
323        let mut v = self.v.lock().unwrap();
324        if off + len > v.len() {
325            v.resize(off + len, 0);
326        }
327        bytes.copy_from_slice(&v[off..off + len]);
328    }
329
330    fn write(&mut self, off: u64, bytes: &[u8]) {
331        let off = off as usize;
332        let len = bytes.len();
333        let mut v = self.v.lock().unwrap();
334        if off + len > v.len() {
335            v.resize(off + len, 0);
336        }
337        v[off..off + len].copy_from_slice(bytes);
338    }
339
340    fn commit(&mut self, size: u64) {
341        let mut v = self.v.lock().unwrap();
342        v.resize(size as usize, 0);
343    }
344}
345
346use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
347
348struct FileInner {
349    f: fs::File,
350}
351
352impl FileInner {
353    /// Construct from filename.
354    pub fn new(filename: &str) -> Self {
355        Self {
356            f: OpenOptions::new()
357                .read(true)
358                .write(true)
359                .create(true)
360                .truncate(false)
361                .open(filename)
362                .unwrap(),
363        }
364    }
365
366    fn size(&mut self) -> u64 {
367        self.f.seek(SeekFrom::End(0)).unwrap()
368    }
369
370    fn read(&mut self, off: u64, bytes: &mut [u8]) {
371        self.f.seek(SeekFrom::Start(off)).unwrap();
372        let _ = self.f.read(bytes).unwrap();
373    }
374
375    fn write(&mut self, off: u64, bytes: &[u8]) {
376        // The list of operating systems which auto-zero is likely more than this...research is todo.
377        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
378        {
379            let size = self.f.seek(SeekFrom::End(0)).unwrap();
380            if off > size {
381                self.f.set_len(off).unwrap();
382            }
383        }
384        self.f.seek(SeekFrom::Start(off)).unwrap();
385        let _ = self.f.write(bytes).unwrap();
386    }
387
388    fn commit(&mut self, size: u64) {
389        self.f.set_len(size).unwrap();
390        self.f.sync_all().unwrap();
391    }
392}
393
394/// For atomic upd file, if not unix or windows.
395pub struct UpdFileStorage {
396    file: Cell<Option<FileInner>>,
397}
398
399impl UpdFileStorage {
400    /// Construct from filename.
401    pub fn new(filename: &str) -> Box<Self> {
402        Box::new(Self {
403            file: Cell::new(Some(FileInner::new(filename))),
404        })
405    }
406}
407
408impl BasicStorage for UpdFileStorage {
409    fn size(&self) -> u64 {
410        let mut f = self.file.take().unwrap();
411        let result = f.size();
412        self.file.set(Some(f));
413        result
414    }
415    fn read(&self, off: u64, bytes: &mut [u8]) {
416        let mut f = self.file.take().unwrap();
417        f.read(off, bytes);
418        self.file.set(Some(f));
419    }
420
421    fn write(&mut self, off: u64, bytes: &[u8]) {
422        let mut f = self.file.take().unwrap();
423        f.write(off, bytes);
424        self.file.set(Some(f));
425    }
426
427    fn commit(&mut self, size: u64) {
428        let mut f = self.file.take().unwrap();
429        f.commit(size);
430        self.file.set(Some(f));
431    }
432}
433
434/// Simple implementation of [Storage] using [`std::fs::File`].
435pub struct SimpleFileStorage {
436    file: Arc<Mutex<FileInner>>,
437}
438
439impl SimpleFileStorage {
440    /// Construct from filename.
441    pub fn new(filename: &str) -> Box<Self> {
442        Box::new(Self {
443            file: Arc::new(Mutex::new(FileInner::new(filename))),
444        })
445    }
446}
447
448impl Storage for SimpleFileStorage {
449    fn clone(&self) -> Box<dyn Storage> {
450        Box::new(Self {
451            file: self.file.clone(),
452        })
453    }
454}
455
456impl BasicStorage for SimpleFileStorage {
457    fn size(&self) -> u64 {
458        self.file.lock().unwrap().size()
459    }
460
461    fn read(&self, off: u64, bytes: &mut [u8]) {
462        self.file.lock().unwrap().read(off, bytes);
463    }
464
465    fn write(&mut self, off: u64, bytes: &[u8]) {
466        self.file.lock().unwrap().write(off, bytes);
467    }
468
469    fn commit(&mut self, size: u64) {
470        self.file.lock().unwrap().commit(size);
471    }
472}
473
474/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
475pub struct AnyFileStorage {
476    filename: String,
477    files: Arc<Mutex<Vec<FileInner>>>,
478}
479
480impl AnyFileStorage {
481    /// Create new.
482    pub fn new(filename: &str) -> Box<Self> {
483        Box::new(Self {
484            filename: filename.to_owned(),
485            files: Arc::new(Mutex::new(Vec::new())),
486        })
487    }
488
489    fn get_file(&self) -> FileInner {
490        match self.files.lock().unwrap().pop() {
491            Some(f) => f,
492            _ => FileInner::new(&self.filename),
493        }
494    }
495
496    fn put_file(&self, f: FileInner) {
497        self.files.lock().unwrap().push(f);
498    }
499}
500
501impl Storage for AnyFileStorage {
502    fn clone(&self) -> Box<dyn Storage> {
503        Box::new(Self {
504            filename: self.filename.clone(),
505            files: self.files.clone(),
506        })
507    }
508}
509
510impl BasicStorage for AnyFileStorage {
511    fn size(&self) -> u64 {
512        let mut f = self.get_file();
513        let result = f.size();
514        self.put_file(f);
515        result
516    }
517
518    fn read(&self, off: u64, bytes: &mut [u8]) {
519        let mut f = self.get_file();
520        f.read(off, bytes);
521        self.put_file(f);
522    }
523
524    fn write(&mut self, off: u64, bytes: &[u8]) {
525        let mut f = self.get_file();
526        f.write(off, bytes);
527        self.put_file(f);
528    }
529
530    fn commit(&mut self, size: u64) {
531        let mut f = self.get_file();
532        f.commit(size);
533        self.put_file(f);
534    }
535}
536
537/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
538pub struct DummyFile {}
539impl DummyFile {
540    /// Construct.
541    pub fn new() -> Box<Self> {
542        Box::new(Self {})
543    }
544}
545
546impl Storage for DummyFile {
547    fn clone(&self) -> Box<dyn Storage> {
548        Self::new()
549    }
550}
551
552impl BasicStorage for DummyFile {
553    fn size(&self) -> u64 {
554        0
555    }
556
557    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
558
559    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
560
561    fn commit(&mut self, _size: u64) {}
562}
563
564/// Memory configuration limits for [`AtomicFile`].
565#[non_exhaustive]
566pub struct Limits {
567    /// Limit on size of commit write map, default is 5000.
568    pub map_lim: usize,
569    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
570    pub rbuf_mem: usize,
571    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
572    pub swbuf: usize,
573    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
574    pub uwbuf: usize,
575}
576
577impl Default for Limits {
578    fn default() -> Self {
579        Self {
580            map_lim: 5000,
581            rbuf_mem: 0x200000,
582            swbuf: 0x100000,
583            uwbuf: 0x100000,
584        }
585    }
586}
587
588/// Write Buffer.
589struct WriteBuffer {
590    /// Current write index into buf.
591    ix: usize,
592    /// Current file position.
593    pos: u64,
594    /// Underlying storage.
595    pub stg: Box<dyn BasicStorage>,
596    /// Buffer.
597    buf: Vec<u8>,
598}
599
600impl WriteBuffer {
601    /// Construct.
602    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
603        Self {
604            ix: 0,
605            pos: u64::MAX,
606            stg,
607            buf: vec![0; buf_size],
608        }
609    }
610
611    /// Write data to specified offset,
612    pub fn write(&mut self, off: u64, data: &[u8]) {
613        if self.pos + self.ix as u64 != off {
614            self.flush(off);
615        }
616        let mut done: usize = 0;
617        let mut todo: usize = data.len();
618        while todo > 0 {
619            let mut n: usize = self.buf.len() - self.ix;
620            if n == 0 {
621                self.flush(off + done as u64);
622                n = self.buf.len();
623            }
624            if n > todo {
625                n = todo;
626            }
627            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
628            todo -= n;
629            done += n;
630            self.ix += n;
631        }
632    }
633
634    fn flush(&mut self, new_pos: u64) {
635        if self.ix > 0 {
636            self.stg.write(self.pos, &self.buf[0..self.ix]);
637        }
638        self.ix = 0;
639        self.pos = new_pos;
640    }
641
642    /// Commit.
643    pub fn commit(&mut self, size: u64) {
644        self.flush(u64::MAX);
645        self.stg.commit(size);
646    }
647
648    /// Write u64.
649    pub fn write_u64(&mut self, start: u64, value: u64) {
650        self.write(start, &value.to_le_bytes());
651    }
652}
653
654/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
655///
656/// See implementation of AtomicFile for how this is used in conjunction with WMap.
657///
658/// N is buffer size.
659struct ReadBufStg<const N: usize> {
660    /// Underlying storage.
661    stg: Box<dyn Storage>,
662    /// Buffers.
663    buf: Mutex<ReadBuffer<N>>,
664    /// Read size that is considered small.
665    limit: usize,
666}
667
668impl<const N: usize> Drop for ReadBufStg<N> {
669    fn drop(&mut self) {
670        self.reset();
671    }
672}
673
674impl<const N: usize> ReadBufStg<N> {
675    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
676    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
677        Self {
678            stg,
679            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
680            limit,
681        }
682    }
683
684    /// Clears the buffers.
685    fn reset(&mut self) {
686        self.buf.lock().unwrap().reset();
687    }
688}
689
690impl<const N: usize> BasicStorage for ReadBufStg<N> {
691    /// Read data from storage.
692    fn read(&self, start: u64, data: &mut [u8]) {
693        if data.len() <= self.limit {
694            self.buf.lock().unwrap().read(&*self.stg, start, data);
695        } else {
696            self.stg.read(start, data);
697        }
698    }
699
700    /// Panics.
701    fn size(&self) -> u64 {
702        panic!()
703    }
704
705    /// Panics.
706    fn write(&mut self, _start: u64, _data: &[u8]) {
707        panic!();
708    }
709
710    /// Panics.
711    fn commit(&mut self, _size: u64) {
712        panic!();
713    }
714}
715
716struct ReadBuffer<const N: usize> {
717    /// Maps sector mumbers cached buffers.
718    map: HashMap<u64, Box<[u8; N]>>,
719    /// Maximum number of buffers.
720    max_buf: usize,
721}
722
723impl<const N: usize> ReadBuffer<N> {
724    fn new(max_buf: usize) -> Self {
725        Self {
726            map: HashMap::default(),
727            max_buf,
728        }
729    }
730
731    fn reset(&mut self) {
732        self.map.clear();
733    }
734
735    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
736        let mut done = 0;
737        while done < data.len() {
738            let off = off + done as u64;
739            let sector = off / N as u64;
740            let disp = (off % N as u64) as usize;
741            let amount = min(data.len() - done, N - disp);
742
743            let p = self.map.entry(sector).or_insert_with(|| {
744                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
745                stg.read(sector * N as u64, &mut *p);
746                p
747            });
748            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
749            done += amount;
750        }
751        if self.map.len() >= self.max_buf {
752            self.reset();
753        }
754    }
755}
756
757#[derive(Default)]
758/// Slice of Data to be written to storage.
759struct DataSlice {
760    /// Slice data.
761    pub data: Data,
762    /// Start of slice.
763    pub off: usize,
764    /// Length of slice.
765    pub len: usize,
766}
767
768impl DataSlice {
769    /// Get reference to the whole slice.
770    pub fn all(&self) -> &[u8] {
771        &self.data[self.off..self.off + self.len]
772    }
773    /// Get reference to part of slice.
774    pub fn part(&self, off: usize, len: usize) -> &[u8] {
775        &self.data[self.off + off..self.off + off + len]
776    }
777    /// Trim specified amount from start of slice.
778    pub fn trim(&mut self, trim: usize) {
779        self.off += trim;
780        self.len -= trim;
781    }
782    /// Take the data.
783    #[allow(dead_code)]
784    pub fn take(&mut self) -> Data {
785        std::mem::take(&mut self.data)
786    }
787}
788
789#[derive(Default)]
790/// Updateable store based on some underlying storage.
791struct WMap {
792    /// Map of writes. Key is the end of the slice.
793    map: BTreeMap<u64, DataSlice>,
794}
795
796impl WMap {
797    /// Is the map empty?
798    pub fn is_empty(&self) -> bool {
799        self.map.is_empty()
800    }
801
802    /// Number of key-value pairs in the map.
803    pub fn len(&self) -> usize {
804        self.map.len()
805    }
806
807    /// Take the map and convert it to a Vec.
808    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
809        let map = std::mem::take(&mut self.map);
810        let mut result = GVec::with_capacity(map.len());
811        for (end, v) in map {
812            let start = end - v.len as u64;
813            result.push((start, v));
814        }
815        result
816    }
817
818    /// Write the map into storage.
819    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
820        for (end, v) in self.map.iter() {
821            let start = end - v.len as u64;
822            stg.write_data(start, v.data.clone(), v.off, v.len);
823        }
824    }
825
826    #[cfg(not(feature = "pstd"))]
827    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
828    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
829        if len != 0 {
830            let (mut insert, mut remove) = (Vec::new(), Vec::new());
831            let end = start + len as u64;
832            for (ee, v) in self.map.range_mut(start + 1..) {
833                let ee = *ee;
834                let es = ee - v.len as u64; // Existing write Start.
835                if es >= end {
836                    // Existing write starts after end of new write, nothing to do.
837                    break;
838                } else if start <= es {
839                    if end < ee {
840                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
841                        v.trim((end - es) as usize);
842                        break;
843                    }
844                    // New write subsumes existing write entirely, remove existing write.
845                    remove.push(ee);
846                } else if end < ee {
847                    // New write starts in middle of existing write, ends before end of existing write,
848                    // put start of existing write in insert list, trim existing write.
849                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
850                    v.trim((end - es) as usize);
851                    break;
852                } else {
853                    // New write starts in middle of existing write, ends after existing write,
854                    // put start of existing write in insert list, remove existing write.
855                    insert.push((es, v.take(), v.off, (start - es) as usize));
856                    remove.push(ee);
857                }
858            }
859            for end in remove {
860                self.map.remove(&end);
861            }
862            for (start, data, off, len) in insert {
863                self.map
864                    .insert(start + len as u64, DataSlice { data, off, len });
865            }
866            self.map
867                .insert(start + len as u64, DataSlice { data, off, len });
868        }
869    }
870
871    #[cfg(feature = "pstd")]
872    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
873    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
874        if len != 0 {
875            let end = start + len as u64;
876            let mut c = self
877                .map
878                .lower_bound_mut(std::ops::Bound::Excluded(&start))
879                .with_mutable_key();
880            while let Some((eend, v)) = c.next() {
881                let ee = *eend;
882                let es = ee - v.len as u64; // Existing write Start.
883                if es >= end {
884                    // Existing write starts after end of new write, nothing to do.
885                    c.prev();
886                    break;
887                } else if start <= es {
888                    if end < ee {
889                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
890                        v.trim((end - es) as usize);
891                        c.prev();
892                        break;
893                    }
894                    // New write subsumes existing write entirely, remove existing write.
895                    c.remove_prev();
896                } else if end < ee {
897                    // New write starts in middle of existing write, ends before end of existing write,
898                    // trim existing write, insert start of existing write.
899                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
900                    v.trim((end - es) as usize);
901                    c.prev();
902                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
903                    break;
904                } else {
905                    // New write starts in middle of existing write, ends after existing write,
906                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
907                    v.len = (start - es) as usize;
908                    *eend = es + v.len as u64;
909                }
910            }
911            // Insert the new write.
912            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
913        }
914    }
915
916    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
917    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
918        let len = data.len();
919        if len != 0 {
920            let mut done = 0;
921            for (&end, v) in self.map.range(start + 1..) {
922                let es = end - v.len as u64; // Existing write Start.
923                let doff = start + done as u64;
924                if es > doff {
925                    // Read from underlying storage.
926                    let a = min(len - done, (es - doff) as usize);
927                    u.read(doff, &mut data[done..done + a]);
928                    done += a;
929                    if done == len {
930                        return;
931                    }
932                }
933                // Use existing write.
934                let skip = (start + done as u64 - es) as usize;
935                let a = min(len - done, v.len - skip);
936                data[done..done + a].copy_from_slice(v.part(skip, a));
937                done += a;
938                if done == len {
939                    return;
940                }
941            }
942            u.read(start + done as u64, &mut data[done..]);
943        }
944    }
945}
946
947/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
948pub struct BasicAtomicFile {
949    /// The main underlying storage.
950    stg: WriteBuffer,
951    /// Temporary storage for updates during commit.
952    upd: WriteBuffer,
953    /// Map of writes.
954    map: WMap,
955    /// List of writes.
956    list: GVec<(u64, DataSlice)>,
957    size: u64,
958    stop: bool,
959}
960
961impl BasicAtomicFile {
962    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
963    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
964        let size = stg.size();
965        let mut result = Box::new(Self {
966            stg: WriteBuffer::new(stg, lim.swbuf),
967            upd: WriteBuffer::new(upd, lim.uwbuf),
968            map: WMap::default(),
969            list: GVec::new(),
970            size,
971            stop: false,
972        });
973        result.init();
974        result
975    }
976
977    /// Apply outstanding updates.
978    fn init(&mut self) {
979        let end = self.upd.stg.read_u64(0);
980        let size = self.upd.stg.read_u64(8);
981        if end == 0 {
982            return;
983        }
984        assert!(end == self.upd.stg.size());
985        let mut pos = 16;
986        while pos < end {
987            let start = self.upd.stg.read_u64(pos);
988            pos += 8;
989            let len = self.upd.stg.read_u64(pos);
990            pos += 8;
991            let mut buf: GVec<u8> = gvec![0; len as usize];
992            self.upd.stg.read(pos, &mut buf);
993            pos += len;
994            self.stg.write(start, &buf);
995        }
996        self.stg.commit(size);
997        self.upd.commit(0);
998    }
999
1000    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
1001    pub fn commit_phase(&mut self, size: u64, phase: u8) {
1002        if self.map.is_empty() && self.list.is_empty() {
1003            return;
1004        }
1005        if phase == 1 {
1006            self.list = self.map.convert_to_vec();
1007
1008            // Write the updates to upd.
1009            // First set the end position to zero.
1010            self.upd.write_u64(0, 0);
1011            self.upd.write_u64(8, size);
1012            self.upd.commit(16); // Not clear if this is necessary.
1013
1014            // Write the update records.
1015            let mut stg_written = false;
1016            let mut pos: u64 = 16;
1017            for (start, v) in self.list.iter() {
1018                let (start, len, data) = (*start, v.len as u64, v.all());
1019                if start >= self.size {
1020                    // Writes beyond current stg size can be written directly.
1021                    stg_written = true;
1022                    self.stg.write(start, data);
1023                } else {
1024                    self.upd.write_u64(pos, start);
1025                    pos += 8;
1026                    self.upd.write_u64(pos, len);
1027                    pos += 8;
1028                    self.upd.write(pos, data);
1029                    pos += len;
1030                }
1031            }
1032            if stg_written {
1033                self.stg.commit(size);
1034            }
1035            self.upd.commit(pos); // Not clear if this is necessary.
1036
1037            // Set the end position.
1038            self.upd.write_u64(0, pos);
1039            self.upd.write_u64(8, size);
1040            self.upd.commit(pos);
1041        } else {
1042            for (start, v) in self.list.iter() {
1043                if *start < self.size {
1044                    // Writes beyond current stg size have already been written.
1045                    self.stg.write(*start, v.all());
1046                }
1047            }
1048            self.list = GVec::new();
1049            self.stg.commit(size);
1050            self.upd.commit(0);
1051        }
1052    }
1053}
1054
1055impl BasicStorage for BasicAtomicFile {
1056    fn commit(&mut self, size: u64) {
1057        if self.stop { return; }
1058        self.commit_phase(size, 1);
1059        self.commit_phase(size, 2);
1060        self.size = size;
1061    }
1062
1063    fn size(&self) -> u64 {
1064        self.size
1065    }
1066
1067    fn read(&self, start: u64, data: &mut [u8]) {
1068        self.map.read(start, data, &*self.stg.stg);
1069    }
1070
1071    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1072        self.map.write(start, data, off, len);
1073    }
1074
1075    fn write(&mut self, start: u64, data: &[u8]) {
1076        let len = data.len();
1077        let d = Arc::new(data.to_vec());
1078        self.write_data(start, d, 0, len);
1079    }
1080
1081    fn shutdown(&mut self)
1082    {
1083        self.stop = true;
1084    }
1085}
1086
1087/// Optimized implementation of [Storage] ( unix only ).
1088#[cfg(target_family = "unix")]
1089pub struct UnixFileStorage {
1090    size: Arc<Mutex<u64>>,
1091    f: fs::File,
1092}
1093#[cfg(target_family = "unix")]
1094impl UnixFileStorage {
1095    /// Construct from filename.
1096    pub fn new(filename: &str) -> Box<Self> {
1097        let mut f = OpenOptions::new()
1098            .read(true)
1099            .write(true)
1100            .create(true)
1101            .truncate(false)
1102            .open(filename)
1103            .unwrap();
1104        let size = f.seek(SeekFrom::End(0)).unwrap();
1105        let size = Arc::new(Mutex::new(size));
1106        Box::new(Self { size, f })
1107    }
1108}
1109
1110#[cfg(target_family = "unix")]
1111impl Storage for UnixFileStorage {
1112    fn clone(&self) -> Box<dyn Storage> {
1113        Box::new(Self {
1114            size: self.size.clone(),
1115            f: self.f.try_clone().unwrap(),
1116        })
1117    }
1118}
1119
1120#[cfg(target_family = "unix")]
1121use std::os::unix::fs::FileExt;
1122
1123#[cfg(target_family = "unix")]
1124impl BasicStorage for UnixFileStorage {
1125    fn read(&self, start: u64, data: &mut [u8]) {
1126        let _ = self.f.read_at(data, start);
1127    }
1128
1129    fn write(&mut self, start: u64, data: &[u8]) {
1130        let _ = self.f.write_at(data, start);
1131    }
1132
1133    fn size(&self) -> u64 {
1134        *self.size.lock().unwrap()
1135    }
1136
1137    fn commit(&mut self, size: u64) {
1138        *self.size.lock().unwrap() = size;
1139        self.f.set_len(size).unwrap();
1140        self.f.sync_all().unwrap();
1141    }
1142}
1143
1144/// Optimized implementation of [Storage] ( windows only ).
1145#[cfg(target_family = "windows")]
1146pub struct WindowsFileStorage {
1147    size: Arc<Mutex<u64>>,
1148    f: fs::File,
1149}
1150#[cfg(target_family = "windows")]
1151impl WindowsFileStorage {
1152    /// Construct from filename.
1153    pub fn new(filename: &str) -> Box<Self> {
1154        let mut f = OpenOptions::new()
1155            .read(true)
1156            .write(true)
1157            .create(true)
1158            .truncate(false)
1159            .open(filename)
1160            .unwrap();
1161        let size = f.seek(SeekFrom::End(0)).unwrap();
1162        let size = Arc::new(Mutex::new(size));
1163        Box::new(Self { size, f })
1164    }
1165}
1166
1167#[cfg(target_family = "windows")]
1168impl Storage for WindowsFileStorage {
1169    fn clone(&self) -> Box<dyn Storage> {
1170        Box::new(Self {
1171            size: self.size.clone(),
1172            f: self.f.try_clone().unwrap(),
1173        })
1174    }
1175}
1176
1177#[cfg(target_family = "windows")]
1178use std::os::windows::fs::FileExt;
1179
1180#[cfg(target_family = "windows")]
1181impl BasicStorage for WindowsFileStorage {
1182    fn read(&self, start: u64, data: &mut [u8]) {
1183        let _ = self.f.seek_read(data, start);
1184    }
1185
1186    fn write(&mut self, start: u64, data: &[u8]) {
1187        let _ = self.f.seek_write(data, start);
1188    }
1189
1190    fn size(&self) -> u64 {
1191        *self.size.lock().unwrap()
1192    }
1193
1194    fn commit(&mut self, size: u64) {
1195        *self.size.lock().unwrap() = size;
1196        self.f.set_len(size).unwrap();
1197        self.f.sync_all().unwrap();
1198    }
1199}
1200
1201/// Optimised Storage ( varies according to platform ).
1202#[cfg(target_family = "windows")]
1203pub type MultiFileStorage = WindowsFileStorage;
1204
1205/// Optimised Storage ( varies according to platform ).
1206#[cfg(target_family = "unix")]
1207pub type MultiFileStorage = UnixFileStorage;
1208
1209/// Optimised Storage ( varies according to platform ).
1210#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1211pub type MultiFileStorage = AnyFileStorage;
1212
1213/// Fast Storage for upd file ( varies according to platform ).
1214#[cfg(any(target_family = "windows", target_family = "unix"))]
1215pub type FastFileStorage = MultiFileStorage;
1216
1217/// Fast Storage for upd file ( varies according to platform ).
1218#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1219pub type FastFileStorage = UpdFileStorage;
1220
1221#[cfg(test)]
1222/// Get amount of testing from environment variable TA.
1223fn test_amount() -> usize {
1224    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1225}
1226
1227#[test]
1228fn test_atomic_file() {
1229    use rand::Rng;
1230    /* Idea of test is to check AtomicFile and MemFile behave the same */
1231
1232    let ta = test_amount();
1233    println!(" Test amount={}", ta);
1234
1235    let mut rng = rand::thread_rng();
1236
1237    for _ in 0..100 {
1238        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1239        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1240        let mut s2 = MemFile::new();
1241
1242        for _ in 0..1000 * ta {
1243            let off: usize = rng.r#gen::<usize>() % 100;
1244            let mut len = 1 + rng.r#gen::<usize>() % 20;
1245            let w: bool = rng.r#gen();
1246            if w {
1247                let mut bytes = Vec::new();
1248                while len > 0 {
1249                    len -= 1;
1250                    let b: u8 = rng.r#gen::<u8>();
1251                    bytes.push(b);
1252                }
1253                s1.write(off as u64, &bytes);
1254                s2.write(off as u64, &bytes);
1255            } else {
1256                let mut b2 = vec![0; len];
1257                let mut b3 = vec![0; len];
1258                s1.read(off as u64, &mut b2);
1259                s2.read(off as u64, &mut b3);
1260                assert!(b2 == b3);
1261            }
1262            if rng.r#gen::<usize>() % 50 == 0 {
1263                s1.commit(200);
1264                s2.commit(200);
1265            }
1266        }
1267    }
1268}