Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8//!
9//!# Features
10//!
11//! This crate supports the following cargo features:
12//! - `pstd` : Use pstd crate for `BTreeMap` (allocated in `GTemp`).
13//! - `unsafe-optim` : Enable unsafe optimisations in release mode.
14
15#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{VecA, collections::{BTreeMapA,btree_map::CustomTuning}, localalloc::GTemp};
24
25#[cfg(feature = "pstd")]
26type BTreeMap<K,V> = BTreeMapA<K,V,CustomTuning<GTemp>>;
27
28#[cfg(not(feature = "pstd"))]
29use std::collections::BTreeMap;
30
31#[cfg(feature = "pstd")]
32type GVec<T> = VecA<T, GTemp>;
33
34#[cfg(not(feature = "pstd"))]
35type GVec<T> = Vec<T>;
36
37/// ```Arc<Vec<u8>>```
38pub type Data = Arc<Vec<u8>>;
39
40/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
41/// Performs commit asyncronously.
42///
43/// #Example
44///
45/// ```
46/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
47/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
48/// af.write( 0, &[1,2,3,4] );
49/// af.commit(4);
50/// af.wait_complete();
51/// ```
52///
53/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
54/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
55/// the updates to underlying storage have been applied.
56pub struct AtomicFile {
57    /// New updates are written here.
58    map: WMap,
59    /// Underlying file, with previous updates mapped.
60    cf: Arc<RwLock<CommitFile>>,
61    /// File size.
62    size: u64,
63    /// For sending update maps to be saved.
64    tx: std::sync::mpsc::Sender<(u64, WMap)>,
65    /// Held by update process while it is active.
66    busy: Arc<Mutex<()>>,
67    /// Limit on size of CommitFile map.
68    map_lim: usize,
69}
70
71impl AtomicFile {
72    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
73    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
74        Self::new_with_limits(stg, upd, &Limits::default())
75    }
76
77    /// Construct Atomic file with specified limits.
78    pub fn new_with_limits(
79        stg: Box<dyn Storage>,
80        upd: Box<dyn BasicStorage>,
81        lim: &Limits,
82    ) -> Box<Self> {
83        let size = stg.size();
84        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
85
86        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
87        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
88        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
89
90        // Start the thread which does save asyncronously.
91        let (cf1, busy1) = (cf.clone(), busy.clone());
92
93        std::thread::spawn(move || {
94            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
95            while let Ok((size, map)) = rx.recv() {
96                let _lock = busy1.lock();
97                baf.map = map;
98                baf.commit(size);
99                cf1.write().unwrap().done_one();
100            }
101        });
102        Box::new(Self {
103            map: WMap::default(),
104            cf,
105            size,
106            tx,
107            busy,
108            map_lim: lim.map_lim,
109        })
110    }
111}
112
113impl Storage for AtomicFile {
114    fn clone(&self) -> Box<dyn Storage> {
115        panic!()
116    }
117}
118
119impl BasicStorage for AtomicFile {
120    fn commit(&mut self, size: u64) {
121        self.size = size;
122        if self.map.is_empty() {
123            return;
124        }
125        if self.cf.read().unwrap().map.len() > self.map_lim {
126            self.wait_complete();
127        }
128        let map = std::mem::take(&mut self.map);
129        let cf = &mut *self.cf.write().unwrap();
130        cf.todo += 1;
131        // Apply map of updates to CommitFile.
132        map.to_storage(cf);
133        // Send map of updates to thread to be written to underlying storage.
134        self.tx.send((size, map)).unwrap();
135    }
136
137    fn size(&self) -> u64 {
138        self.size
139    }
140
141    fn read(&self, start: u64, data: &mut [u8]) {
142        self.map.read(start, data, &*self.cf.read().unwrap());
143    }
144
145    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
146        self.map.write(start, data, off, len);
147    }
148
149    fn write(&mut self, start: u64, data: &[u8]) {
150        let len = data.len();
151        let d = Arc::new(data.to_vec());
152        self.write_data(start, d, 0, len);
153    }
154
155    fn wait_complete(&self) {
156        while self.cf.read().unwrap().todo != 0 {
157            let _x = self.busy.lock();
158        }
159    }
160}
161
162struct CommitFile {
163    /// Buffered underlying storage.
164    stg: ReadBufStg<256>,
165    /// Map of committed updates.
166    map: WMap,
167    /// Number of outstanding unsaved commits.
168    todo: usize,
169}
170
171impl CommitFile {
172    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
173        Self {
174            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
175            map: WMap::default(),
176            todo: 0,
177        }
178    }
179
180    fn done_one(&mut self) {
181        self.todo -= 1;
182        if self.todo == 0 {
183            self.map = WMap::default();
184            self.stg.reset();
185        }
186    }
187}
188
189impl BasicStorage for CommitFile {
190    fn commit(&mut self, _size: u64) {
191        panic!()
192    }
193
194    fn size(&self) -> u64 {
195        panic!()
196    }
197
198    fn read(&self, start: u64, data: &mut [u8]) {
199        self.map.read(start, data, &self.stg);
200    }
201
202    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
203        self.map.write(start, data, off, len);
204    }
205
206    fn write(&mut self, _start: u64, _data: &[u8]) {
207        panic!()
208    }
209}
210
211/// Storage interface - BasicStorage is some kind of "file" storage.
212///
213/// read and write methods take a start which is a byte offset in the underlying file.
214pub trait BasicStorage: Send {
215    /// Get the size of the underlying storage.
216    /// Note : this is valid initially and after a commit but is not defined after write is called.
217    fn size(&self) -> u64;
218
219    /// Read data.
220    fn read(&self, start: u64, data: &mut [u8]);
221
222    /// Write byte slice to storage.
223    fn write(&mut self, start: u64, data: &[u8]);
224
225    /// Write byte Vec.
226    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
227        let len = data.len();
228        let d = Arc::new(data);
229        self.write_data(start, d, 0, len);
230    }
231
232    /// Write Data slice.
233    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
234        self.write(start, &data[off..off + len]);
235    }
236
237    /// Finish write transaction, size is new size of underlying storage.
238    fn commit(&mut self, size: u64);
239
240    /// Write u64.
241    fn write_u64(&mut self, start: u64, value: u64) {
242        self.write(start, &value.to_le_bytes());
243    }
244
245    /// Read u64.
246    fn read_u64(&self, start: u64) -> u64 {
247        let mut bytes = [0; 8];
248        self.read(start, &mut bytes);
249        u64::from_le_bytes(bytes)
250    }
251
252    /// Wait until current writes are complete.
253    fn wait_complete(&self) {}
254}
255
256/// BasicStorage with Sync and clone.
257pub trait Storage: BasicStorage + Sync {
258    /// Clone.
259    fn clone(&self) -> Box<dyn Storage>;
260}
261
262/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
263#[derive(Default)]
264pub struct MemFile {
265    v: Arc<Mutex<Vec<u8>>>,
266}
267
268impl MemFile {
269    /// Get a new (boxed) MemFile.
270    pub fn new() -> Box<Self> {
271        Box::default()
272    }
273}
274
275impl Storage for MemFile {
276    fn clone(&self) -> Box<dyn Storage> {
277        Box::new(Self { v: self.v.clone() })
278    }
279}
280
281impl BasicStorage for MemFile {
282    fn size(&self) -> u64 {
283        let v = self.v.lock().unwrap();
284        v.len() as u64
285    }
286
287    fn read(&self, off: u64, bytes: &mut [u8]) {
288        let off = off as usize;
289        let len = bytes.len();
290        let mut v = self.v.lock().unwrap();
291        if off + len > v.len() {
292            v.resize(off + len, 0);
293        }
294        bytes.copy_from_slice(&v[off..off + len]);
295    }
296
297    fn write(&mut self, off: u64, bytes: &[u8]) {
298        let off = off as usize;
299        let len = bytes.len();
300        let mut v = self.v.lock().unwrap();
301        if off + len > v.len() {
302            v.resize(off + len, 0);
303        }
304        v[off..off + len].copy_from_slice(bytes);
305    }
306
307    fn commit(&mut self, size: u64) {
308        let mut v = self.v.lock().unwrap();
309        v.resize(size as usize, 0);
310    }
311}
312
313use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
314
315struct FileInner {
316    f: fs::File,
317}
318
319impl FileInner {
320    /// Construct from filename.
321    pub fn new(filename: &str) -> Self {
322        Self {
323            f: OpenOptions::new()
324                .read(true)
325                .write(true)
326                .create(true)
327                .truncate(false)
328                .open(filename)
329                .unwrap(),
330        }
331    }
332
333    fn size(&mut self) -> u64 {
334        self.f.seek(SeekFrom::End(0)).unwrap()
335    }
336
337    fn read(&mut self, off: u64, bytes: &mut [u8]) {
338        self.f.seek(SeekFrom::Start(off)).unwrap();
339        let _ = self.f.read(bytes).unwrap();
340    }
341
342    fn write(&mut self, off: u64, bytes: &[u8]) {
343        // The list of operating systems which auto-zero is likely more than this...research is todo.
344        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
345        {
346            let size = self.f.seek(SeekFrom::End(0)).unwrap();
347            if off > size {
348                self.f.set_len(off).unwrap();
349            }
350        }
351        self.f.seek(SeekFrom::Start(off)).unwrap();
352        let _ = self.f.write(bytes).unwrap();
353    }
354
355    fn commit(&mut self, size: u64) {
356        self.f.set_len(size).unwrap();
357        self.f.sync_all().unwrap();
358    }
359}
360
361/// For atomic upd file, if not unix or windows.
362pub struct UpdFileStorage {
363    file: Cell<Option<FileInner>>,
364}
365
366impl UpdFileStorage {
367    /// Construct from filename.
368    pub fn new(filename: &str) -> Box<Self> {
369        Box::new(Self {
370            file: Cell::new(Some(FileInner::new(filename))),
371        })
372    }
373}
374
375impl BasicStorage for UpdFileStorage {
376    fn size(&self) -> u64 {
377        let mut f = self.file.take().unwrap();
378        let result = f.size();
379        self.file.set(Some(f));
380        result
381    }
382    fn read(&self, off: u64, bytes: &mut [u8]) {
383        let mut f = self.file.take().unwrap();
384        f.read(off, bytes);
385        self.file.set(Some(f));
386    }
387
388    fn write(&mut self, off: u64, bytes: &[u8]) {
389        let mut f = self.file.take().unwrap();
390        f.write(off, bytes);
391        self.file.set(Some(f));
392    }
393
394    fn commit(&mut self, size: u64) {
395        let mut f = self.file.take().unwrap();
396        f.commit(size);
397        self.file.set(Some(f));
398    }
399}
400
401/// Simple implementation of [Storage] using [`std::fs::File`].
402pub struct SimpleFileStorage {
403    file: Arc<Mutex<FileInner>>,
404}
405
406impl SimpleFileStorage {
407    /// Construct from filename.
408    pub fn new(filename: &str) -> Box<Self> {
409        Box::new(Self {
410            file: Arc::new(Mutex::new(FileInner::new(filename))),
411        })
412    }
413}
414
415impl Storage for SimpleFileStorage {
416    fn clone(&self) -> Box<dyn Storage> {
417        Box::new(Self {
418            file: self.file.clone(),
419        })
420    }
421}
422
423impl BasicStorage for SimpleFileStorage {
424    fn size(&self) -> u64 {
425        self.file.lock().unwrap().size()
426    }
427
428    fn read(&self, off: u64, bytes: &mut [u8]) {
429        self.file.lock().unwrap().read(off, bytes);
430    }
431
432    fn write(&mut self, off: u64, bytes: &[u8]) {
433        self.file.lock().unwrap().write(off, bytes);
434    }
435
436    fn commit(&mut self, size: u64) {
437        self.file.lock().unwrap().commit(size);
438    }
439}
440
441/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
442pub struct AnyFileStorage {
443    filename: String,
444    files: Arc<Mutex<Vec<FileInner>>>,
445}
446
447impl AnyFileStorage {
448    /// Create new.
449    pub fn new(filename: &str) -> Box<Self> {
450        Box::new(Self {
451            filename: filename.to_owned(),
452            files: Arc::new(Mutex::new(Vec::new())),
453        })
454    }
455
456    fn get_file(&self) -> FileInner {
457        match self.files.lock().unwrap().pop() {
458            Some(f) => f,
459            _ => FileInner::new(&self.filename),
460        }
461    }
462
463    fn put_file(&self, f: FileInner) {
464        self.files.lock().unwrap().push(f);
465    }
466}
467
468impl Storage for AnyFileStorage {
469    fn clone(&self) -> Box<dyn Storage> {
470        Box::new(Self {
471            filename: self.filename.clone(),
472            files: self.files.clone(),
473        })
474    }
475}
476
477impl BasicStorage for AnyFileStorage {
478    fn size(&self) -> u64 {
479        let mut f = self.get_file();
480        let result = f.size();
481        self.put_file(f);
482        result
483    }
484
485    fn read(&self, off: u64, bytes: &mut [u8]) {
486        let mut f = self.get_file();
487        f.read(off, bytes);
488        self.put_file(f);
489    }
490
491    fn write(&mut self, off: u64, bytes: &[u8]) {
492        let mut f = self.get_file();
493        f.write(off, bytes);
494        self.put_file(f);
495    }
496
497    fn commit(&mut self, size: u64) {
498        let mut f = self.get_file();
499        f.commit(size);
500        self.put_file(f);
501    }
502}
503
504/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
505pub struct DummyFile {}
506impl DummyFile {
507    /// Construct.
508    pub fn new() -> Box<Self> {
509        Box::new(Self {})
510    }
511}
512
513impl Storage for DummyFile {
514    fn clone(&self) -> Box<dyn Storage> {
515        Self::new()
516    }
517}
518
519impl BasicStorage for DummyFile {
520    fn size(&self) -> u64 {
521        0
522    }
523
524    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
525
526    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
527
528    fn commit(&mut self, _size: u64) {}
529}
530
531/// Memory configuration limits for [`AtomicFile`].
532#[non_exhaustive]
533pub struct Limits {
534    /// Limit on size of commit write map, default is 5000.
535    pub map_lim: usize,
536    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
537    pub rbuf_mem: usize,
538    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
539    pub swbuf: usize,
540    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
541    pub uwbuf: usize,
542}
543
544impl Default for Limits {
545    fn default() -> Self {
546        Self {
547            map_lim: 5000,
548            rbuf_mem: 0x200000,
549            swbuf: 0x100000,
550            uwbuf: 0x100000,
551        }
552    }
553}
554
555/// Write Buffer.
556struct WriteBuffer {
557    /// Current write index into buf.
558    ix: usize,
559    /// Current file position.
560    pos: u64,
561    /// Underlying storage.
562    pub stg: Box<dyn BasicStorage>,
563    /// Buffer.
564    buf: Vec<u8>,
565}
566
567impl WriteBuffer {
568    /// Construct.
569    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
570        Self {
571            ix: 0,
572            pos: u64::MAX,
573            stg,
574            buf: vec![0; buf_size],
575        }
576    }
577
578    /// Write data to specified offset,
579    pub fn write(&mut self, off: u64, data: &[u8]) {
580        if self.pos + self.ix as u64 != off {
581            self.flush(off);
582        }
583        let mut done: usize = 0;
584        let mut todo: usize = data.len();
585        while todo > 0 {
586            let mut n: usize = self.buf.len() - self.ix;
587            if n == 0 {
588                self.flush(off + done as u64);
589                n = self.buf.len();
590            }
591            if n > todo {
592                n = todo;
593            }
594            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
595            todo -= n;
596            done += n;
597            self.ix += n;
598        }
599    }
600
601    fn flush(&mut self, new_pos: u64) {
602        if self.ix > 0 {
603            self.stg.write(self.pos, &self.buf[0..self.ix]);
604        }
605        self.ix = 0;
606        self.pos = new_pos;
607    }
608
609    /// Commit.
610    pub fn commit(&mut self, size: u64) {
611        self.flush(u64::MAX);
612        self.stg.commit(size);
613    }
614
615    /// Write u64.
616    pub fn write_u64(&mut self, start: u64, value: u64) {
617        self.write(start, &value.to_le_bytes());
618    }
619}
620
621/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
622///
623/// See implementation of AtomicFile for how this is used in conjunction with WMap.
624///
625/// N is buffer size.
626struct ReadBufStg<const N: usize> {
627    /// Underlying storage.
628    stg: Box<dyn Storage>,
629    /// Buffers.
630    buf: Mutex<ReadBuffer<N>>,
631    /// Read size that is considered small.
632    limit: usize,
633}
634
635impl<const N: usize> Drop for ReadBufStg<N> {
636    fn drop(&mut self) {
637        self.reset();
638    }
639}
640
641impl<const N: usize> ReadBufStg<N> {
642    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
643    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
644        Self {
645            stg,
646            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
647            limit,
648        }
649    }
650
651    /// Clears the buffers.
652    fn reset(&mut self) {
653        self.buf.lock().unwrap().reset();
654    }
655}
656
657impl<const N: usize> BasicStorage for ReadBufStg<N> {
658    /// Read data from storage.
659    fn read(&self, start: u64, data: &mut [u8]) {
660        if data.len() <= self.limit {
661            self.buf.lock().unwrap().read(&*self.stg, start, data);
662        } else {
663            self.stg.read(start, data);
664        }
665    }
666
667    /// Panics.
668    fn size(&self) -> u64 {
669        panic!()
670    }
671
672    /// Panics.
673    fn write(&mut self, _start: u64, _data: &[u8]) {
674        panic!();
675    }
676
677    /// Panics.
678    fn commit(&mut self, _size: u64) {
679        panic!();
680    }
681}
682
683struct ReadBuffer<const N: usize> {
684    /// Maps sector mumbers cached buffers.
685    map: HashMap<u64, Box<[u8; N]>>,
686    /// Maximum number of buffers.
687    max_buf: usize,
688}
689
690impl<const N: usize> ReadBuffer<N> {
691    fn new(max_buf: usize) -> Self {
692        Self {
693            map: HashMap::default(),
694            max_buf,
695        }
696    }
697
698    fn reset(&mut self) {
699        self.map.clear();
700    }
701
702    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
703        let mut done = 0;
704        while done < data.len() {
705            let off = off + done as u64;
706            let sector = off / N as u64;
707            let disp = (off % N as u64) as usize;
708            let amount = min(data.len() - done, N - disp);
709
710            let p = self.map.entry(sector).or_insert_with(|| {
711                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
712                stg.read(sector * N as u64, &mut *p);
713                p
714            });
715            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
716            done += amount;
717        }
718        if self.map.len() >= self.max_buf {
719            self.reset();
720        }
721    }
722}
723
724#[derive(Default)]
725/// Slice of Data to be written to storage.
726struct DataSlice {
727    /// Slice data.
728    pub data: Data,
729    /// Start of slice.
730    pub off: usize,
731    /// Length of slice.
732    pub len: usize,
733}
734
735impl DataSlice {
736    /// Get reference to the whole slice.
737    pub fn all(&self) -> &[u8] {
738        &self.data[self.off..self.off + self.len]
739    }
740    /// Get reference to part of slice.
741    pub fn part(&self, off: usize, len: usize) -> &[u8] {
742        &self.data[self.off + off..self.off + off + len]
743    }
744    /// Trim specified amount from start of slice.
745    pub fn trim(&mut self, trim: usize) {
746        self.off += trim;
747        self.len -= trim;
748    }
749    /// Take the data.
750    #[allow(dead_code)]
751    pub fn take(&mut self) -> Data {
752        std::mem::take(&mut self.data)
753    }
754}
755
756#[derive(Default)]
757/// Updateable store based on some underlying storage.
758struct WMap {
759    /// Map of writes. Key is the end of the slice.
760    map: BTreeMap<u64, DataSlice>,
761}
762
763impl WMap {
764    /// Is the map empty?
765    pub fn is_empty(&self) -> bool {
766        self.map.is_empty()
767    }
768
769    /// Number of key-value pairs in the map.
770    pub fn len(&self) -> usize {
771        self.map.len()
772    }
773
774    /// Take the map and convert it to a Vec.
775    pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
776        let map = std::mem::take(&mut self.map);
777        let mut result = GVec::with_capacity(map.len());
778        for (end, v) in map {
779            let start = end - v.len as u64;
780            result.push((start, v));
781        }
782        result
783    }
784
785    /// Write the map into storage.
786    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
787        for (end, v) in self.map.iter() {
788            let start = end - v.len as u64;
789            stg.write_data(start, v.data.clone(), v.off, v.len);
790        }
791    }
792
793    #[cfg(not(feature = "pstd"))]
794    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
795    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
796        if len != 0 {
797            let (mut insert, mut remove) = (Vec::new(), Vec::new());
798            let end = start + len as u64;
799            for (ee, v) in self.map.range_mut(start + 1..) {
800                let ee = *ee;
801                let es = ee - v.len as u64; // Existing write Start.
802                if es >= end {
803                    // Existing write starts after end of new write, nothing to do.
804                    break;
805                } else if start <= es {
806                    if end < ee {
807                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
808                        v.trim((end - es) as usize);
809                        break;
810                    }
811                    // New write subsumes existing write entirely, remove existing write.
812                    remove.push(ee);
813                } else if end < ee {
814                    // New write starts in middle of existing write, ends before end of existing write,
815                    // put start of existing write in insert list, trim existing write.
816                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
817                    v.trim((end - es) as usize);
818                    break;
819                } else {
820                    // New write starts in middle of existing write, ends after existing write,
821                    // put start of existing write in insert list, remove existing write.
822                    insert.push((es, v.take(), v.off, (start - es) as usize));
823                    remove.push(ee);
824                }
825            }
826            for end in remove {
827                self.map.remove(&end);
828            }
829            for (start, data, off, len) in insert {
830                self.map
831                    .insert(start + len as u64, DataSlice { data, off, len });
832            }
833            self.map
834                .insert(start + len as u64, DataSlice { data, off, len });
835        }
836    }
837
838    #[cfg(feature = "pstd")]
839    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
840    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
841        if len != 0 {
842            let end = start + len as u64;
843            let mut c = self
844                .map
845                .lower_bound_mut(std::ops::Bound::Excluded(&start))
846                .with_mutable_key();
847            while let Some((eend, v)) = c.next() {
848                let ee = *eend;
849                let es = ee - v.len as u64; // Existing write Start.
850                if es >= end {
851                    // Existing write starts after end of new write, nothing to do.
852                    c.prev();
853                    break;
854                } else if start <= es {
855                    if end < ee {
856                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
857                        v.trim((end - es) as usize);
858                        c.prev();
859                        break;
860                    }
861                    // New write subsumes existing write entirely, remove existing write.
862                    c.remove_prev();
863                } else if end < ee {
864                    // New write starts in middle of existing write, ends before end of existing write,
865                    // trim existing write, insert start of existing write.
866                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
867                    v.trim((end - es) as usize);
868                    c.prev();
869                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
870                    break;
871                } else {
872                    // New write starts in middle of existing write, ends after existing write,
873                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
874                    v.len = (start - es) as usize;
875                    *eend = es + v.len as u64;
876                }
877            }
878            // Insert the new write.
879            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
880        }
881    }
882
883    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
884    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
885        let len = data.len();
886        if len != 0 {
887            let mut done = 0;
888            for (&end, v) in self.map.range(start + 1..) {
889                let es = end - v.len as u64; // Existing write Start.
890                let doff = start + done as u64;
891                if es > doff {
892                    // Read from underlying storage.
893                    let a = min(len - done, (es - doff) as usize);
894                    u.read(doff, &mut data[done..done + a]);
895                    done += a;
896                    if done == len {
897                        return;
898                    }
899                }
900                // Use existing write.
901                let skip = (start + done as u64 - es) as usize;
902                let a = min(len - done, v.len - skip);
903                data[done..done + a].copy_from_slice(v.part(skip, a));
904                done += a;
905                if done == len {
906                    return;
907                }
908            }
909            u.read(start + done as u64, &mut data[done..]);
910        }
911    }
912}
913
914/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
915pub struct BasicAtomicFile {
916    /// The main underlying storage.
917    stg: WriteBuffer,
918    /// Temporary storage for updates during commit.
919    upd: WriteBuffer,
920    /// Map of writes.
921    map: WMap,
922    /// List of writes.
923    list: GVec<(u64, DataSlice)>,
924    size: u64,
925}
926
927impl BasicAtomicFile {
928    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
929    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
930        let size = stg.size();
931        let mut result = Box::new(Self {
932            stg: WriteBuffer::new(stg, lim.swbuf),
933            upd: WriteBuffer::new(upd, lim.uwbuf),
934            map: WMap::default(),
935            list: GVec::new(),
936            size,
937        });
938        result.init();
939        result
940    }
941
942    /// Apply outstanding updates.
943    fn init(&mut self) {
944        let end = self.upd.stg.read_u64(0);
945        let size = self.upd.stg.read_u64(8);
946        if end == 0 {
947            return;
948        }
949        assert!(end == self.upd.stg.size());
950        let mut pos = 16;
951        while pos < end {
952            let start = self.upd.stg.read_u64(pos);
953            pos += 8;
954            let len = self.upd.stg.read_u64(pos);
955            pos += 8;
956            let mut buf = vec![0; len as usize];
957            self.upd.stg.read(pos, &mut buf);
958            pos += len;
959            self.stg.write(start, &buf);
960        }
961        self.stg.commit(size);
962        self.upd.commit(0);
963    }
964
965    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
966    pub fn commit_phase(&mut self, size: u64, phase: u8) {
967        if self.map.is_empty() && self.list.is_empty() {
968            return;
969        }
970        if phase == 1 {
971            self.list = self.map.convert_to_vec();
972
973            // Write the updates to upd.
974            // First set the end position to zero.
975            self.upd.write_u64(0, 0);
976            self.upd.write_u64(8, size);
977            self.upd.commit(16); // Not clear if this is necessary.
978
979            // Write the update records.
980            let mut stg_written = false;
981            let mut pos: u64 = 16;
982            for (start, v) in self.list.iter() {
983                let (start, len, data) = (*start, v.len as u64, v.all());
984                if start >= self.size {
985                    // Writes beyond current stg size can be written directly.
986                    stg_written = true;
987                    self.stg.write(start, data);
988                } else {
989                    self.upd.write_u64(pos, start);
990                    pos += 8;
991                    self.upd.write_u64(pos, len);
992                    pos += 8;
993                    self.upd.write(pos, data);
994                    pos += len;
995                }
996            }
997            if stg_written {
998                self.stg.commit(size);
999            }
1000            self.upd.commit(pos); // Not clear if this is necessary.
1001
1002            // Set the end position.
1003            self.upd.write_u64(0, pos);
1004            self.upd.write_u64(8, size);
1005            self.upd.commit(pos);
1006        } else {
1007            for (start, v) in self.list.iter() {
1008                if *start < self.size {
1009                    // Writes beyond current stg size have already been written.
1010                    self.stg.write(*start, v.all());
1011                }
1012            }
1013            self.list = GVec::new();
1014            self.stg.commit(size);
1015            self.upd.commit(0);
1016        }
1017    }
1018}
1019
1020impl BasicStorage for BasicAtomicFile {
1021    fn commit(&mut self, size: u64) {
1022        self.commit_phase(size, 1);
1023        self.commit_phase(size, 2);
1024        self.size = size;
1025    }
1026
1027    fn size(&self) -> u64 {
1028        self.size
1029    }
1030
1031    fn read(&self, start: u64, data: &mut [u8]) {
1032        self.map.read(start, data, &*self.stg.stg);
1033    }
1034
1035    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1036        self.map.write(start, data, off, len);
1037    }
1038
1039    fn write(&mut self, start: u64, data: &[u8]) {
1040        let len = data.len();
1041        let d = Arc::new(data.to_vec());
1042        self.write_data(start, d, 0, len);
1043    }
1044}
1045
1046/// Optimized implementation of [Storage] ( unix only ).
1047#[cfg(target_family = "unix")]
1048pub struct UnixFileStorage {
1049    size: Arc<Mutex<u64>>,
1050    f: fs::File,
1051}
1052#[cfg(target_family = "unix")]
1053impl UnixFileStorage {
1054    /// Construct from filename.
1055    pub fn new(filename: &str) -> Box<Self> {
1056        let mut f = OpenOptions::new()
1057            .read(true)
1058            .write(true)
1059            .create(true)
1060            .truncate(false)
1061            .open(filename)
1062            .unwrap();
1063        let size = f.seek(SeekFrom::End(0)).unwrap();
1064        let size = Arc::new(Mutex::new(size));
1065        Box::new(Self { size, f })
1066    }
1067}
1068
1069#[cfg(target_family = "unix")]
1070impl Storage for UnixFileStorage {
1071    fn clone(&self) -> Box<dyn Storage> {
1072        Box::new(Self {
1073            size: self.size.clone(),
1074            f: self.f.try_clone().unwrap(),
1075        })
1076    }
1077}
1078
1079#[cfg(target_family = "unix")]
1080use std::os::unix::fs::FileExt;
1081
1082#[cfg(target_family = "unix")]
1083impl BasicStorage for UnixFileStorage {
1084    fn read(&self, start: u64, data: &mut [u8]) {
1085        let _ = self.f.read_at(data, start);
1086    }
1087
1088    fn write(&mut self, start: u64, data: &[u8]) {
1089        let _ = self.f.write_at(data, start);
1090    }
1091
1092    fn size(&self) -> u64 {
1093        *self.size.lock().unwrap()
1094    }
1095
1096    fn commit(&mut self, size: u64) {
1097        *self.size.lock().unwrap() = size;
1098        self.f.set_len(size).unwrap();
1099        self.f.sync_all().unwrap();
1100    }
1101}
1102
1103/// Optimized implementation of [Storage] ( windows only ).
1104#[cfg(target_family = "windows")]
1105pub struct WindowsFileStorage {
1106    size: Arc<Mutex<u64>>,
1107    f: fs::File,
1108}
1109#[cfg(target_family = "windows")]
1110impl WindowsFileStorage {
1111    /// Construct from filename.
1112    pub fn new(filename: &str) -> Box<Self> {
1113        let mut f = OpenOptions::new()
1114            .read(true)
1115            .write(true)
1116            .create(true)
1117            .truncate(false)
1118            .open(filename)
1119            .unwrap();
1120        let size = f.seek(SeekFrom::End(0)).unwrap();
1121        let size = Arc::new(Mutex::new(size));
1122        Box::new(Self { size, f })
1123    }
1124}
1125
1126#[cfg(target_family = "windows")]
1127impl Storage for WindowsFileStorage {
1128    fn clone(&self) -> Box<dyn Storage> {
1129        Box::new(Self {
1130            size: self.size.clone(),
1131            f: self.f.try_clone().unwrap(),
1132        })
1133    }
1134}
1135
1136#[cfg(target_family = "windows")]
1137use std::os::windows::fs::FileExt;
1138
1139#[cfg(target_family = "windows")]
1140impl BasicStorage for WindowsFileStorage {
1141    fn read(&self, start: u64, data: &mut [u8]) {
1142        let _ = self.f.seek_read(data, start);
1143    }
1144
1145    fn write(&mut self, start: u64, data: &[u8]) {
1146        let _ = self.f.seek_write(data, start);
1147    }
1148
1149    fn size(&self) -> u64 {
1150        *self.size.lock().unwrap()
1151    }
1152
1153    fn commit(&mut self, size: u64) {
1154        *self.size.lock().unwrap() = size;
1155        self.f.set_len(size).unwrap();
1156        self.f.sync_all().unwrap();
1157    }
1158}
1159
1160/// Optimised Storage ( varies according to platform ).
1161#[cfg(target_family = "windows")]
1162pub type MultiFileStorage = WindowsFileStorage;
1163
1164/// Optimised Storage ( varies according to platform ).
1165#[cfg(target_family = "unix")]
1166pub type MultiFileStorage = UnixFileStorage;
1167
1168/// Optimised Storage ( varies according to platform ).
1169#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1170pub type MultiFileStorage = AnyFileStorage;
1171
1172/// Fast Storage for upd file ( varies according to platform ).
1173#[cfg(any(target_family = "windows", target_family = "unix"))]
1174pub type FastFileStorage = MultiFileStorage;
1175
1176/// Fast Storage for upd file ( varies according to platform ).
1177#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1178pub type FastFileStorage = UpdFileStorage;
1179
1180#[cfg(test)]
1181/// Get amount of testing from environment variable TA.
1182fn test_amount() -> usize {
1183    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1184}
1185
1186#[test]
1187fn test_atomic_file() {
1188    use rand::Rng;
1189    /* Idea of test is to check AtomicFile and MemFile behave the same */
1190
1191    let ta = test_amount();
1192    println!(" Test amount={}", ta);
1193
1194    let mut rng = rand::thread_rng();
1195
1196    for _ in 0..100 {
1197        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1198        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1199        let mut s2 = MemFile::new();
1200
1201        for _ in 0..1000 * ta {
1202            let off: usize = rng.r#gen::<usize>() % 100;
1203            let mut len = 1 + rng.r#gen::<usize>() % 20;
1204            let w: bool = rng.r#gen();
1205            if w {
1206                let mut bytes = Vec::new();
1207                while len > 0 {
1208                    len -= 1;
1209                    let b: u8 = rng.r#gen::<u8>();
1210                    bytes.push(b);
1211                }
1212                s1.write(off as u64, &bytes);
1213                s2.write(off as u64, &bytes);
1214            } else {
1215                let mut b2 = vec![0; len];
1216                let mut b3 = vec![0; len];
1217                s1.read(off as u64, &mut b2);
1218                s2.read(off as u64, &mut b3);
1219                assert!(b2 == b3);
1220            }
1221            if rng.r#gen::<usize>() % 50 == 0 {
1222                s1.commit(200);
1223                s2.commit(200);
1224            }
1225        }
1226    }
1227}