Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10use std::cell::RefCell;
11
12#[cfg(feature = "pstd")]
13use pstd::collections::BTreeMap;
14#[cfg(not(feature = "pstd"))]
15use std::collections::BTreeMap;
16
17/// ```Arc<Vec<u8>>```
18pub type Data = Arc<Vec<u8>>;
19
20/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
21/// Performs commit asyncronously.
22///
23/// #Example
24///
25/// ```
26/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
27/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
28/// af.write( 0, &[1,2,3,4] );
29/// af.commit(4);
30/// af.wait_complete();
31/// ```
32///
33/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
34/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
35/// the updates to underlying storage have been applied.
36pub struct AtomicFile {
37    /// New updates are written here.
38    map: WMap,
39    /// Underlying file, with previous updates mapped.
40    cf: Arc<RwLock<CommitFile>>,
41    /// File size.
42    size: u64,
43    /// For sending update maps to be saved.
44    tx: std::sync::mpsc::Sender<(u64, WMap)>,
45    /// Held by update process while it is active.
46    busy: Arc<Mutex<()>>,
47    /// Limit on size of CommitFile map.
48    map_lim: usize,
49}
50
51impl AtomicFile {
52    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
53    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
54        Self::new_with_limits(stg, upd, &Limits::default())
55    }
56
57    /// Construct Atomic file with specified limits.
58    pub fn new_with_limits(
59        stg: Box<dyn Storage>,
60        upd: Box<dyn BasicStorage>,
61        lim: &Limits,
62    ) -> Box<Self> {
63        let size = stg.size();
64        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
65
66        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
67        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
68        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
69
70        // Start the thread which does save asyncronously.
71        let (cf1, busy1) = (cf.clone(), busy.clone());
72
73        std::thread::spawn(move || {
74            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
75            while let Ok((size, map)) = rx.recv() {
76                let _lock = busy1.lock();
77                baf.map = map;
78                baf.commit(size);
79                cf1.write().unwrap().done_one();
80            }
81        });
82        Box::new(Self {
83            map: WMap::default(),
84            cf,
85            size,
86            tx,
87            busy,
88            map_lim: lim.map_lim,
89        })
90    }
91}
92
93impl Storage for AtomicFile {}
94
95impl BasicStorage for AtomicFile {
96    fn commit(&mut self, size: u64) {
97        self.size = size;
98        if self.map.is_empty() {
99            return;
100        }
101        if self.cf.read().unwrap().map.len() > self.map_lim {
102            self.wait_complete();
103        }
104        let map = std::mem::take(&mut self.map);
105        let cf = &mut *self.cf.write().unwrap();
106        cf.todo += 1;
107        // Apply map of updates to CommitFile.
108        map.to_storage(cf);
109        // Send map of updates to thread to be written to underlying storage.
110        self.tx.send((size, map)).unwrap();
111    }
112
113    fn size(&self) -> u64 {
114        self.size
115    }
116
117    fn read(&self, start: u64, data: &mut [u8]) {
118        self.map.read(start, data, &*self.cf.read().unwrap());
119    }
120
121    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
122        self.map.write(start, data, off, len);
123    }
124
125    fn write(&mut self, start: u64, data: &[u8]) {
126        let len = data.len();
127        let d = Arc::new(data.to_vec());
128        self.write_data(start, d, 0, len);
129    }
130
131    fn wait_complete(&self) {
132        while self.cf.read().unwrap().todo != 0 {
133            let _x = self.busy.lock();
134        }
135    }
136}
137
138struct CommitFile {
139    /// Buffered underlying storage.
140    stg: ReadBufStg<256>,
141    /// Map of committed updates.
142    map: WMap,
143    /// Number of outstanding unsaved commits.
144    todo: usize,
145}
146
147impl CommitFile {
148    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
149        Self {
150            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
151            map: WMap::default(),
152            todo: 0,
153        }
154    }
155
156    fn done_one(&mut self) {
157        self.todo -= 1;
158        if self.todo == 0 {
159            self.map = WMap::default();
160            self.stg.reset();
161        }
162    }
163}
164
165impl BasicStorage for CommitFile {
166    fn commit(&mut self, _size: u64) {
167        panic!()
168    }
169
170    fn size(&self) -> u64 {
171        panic!()
172    }
173
174    fn read(&self, start: u64, data: &mut [u8]) {
175        self.map.read(start, data, &self.stg);
176    }
177
178    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
179        self.map.write(start, data, off, len);
180    }
181
182    fn write(&mut self, _start: u64, _data: &[u8]) {
183        panic!()
184    }
185}
186
187/// Storage interface - BasicStorage is some kind of "file" storage.
188///
189/// read and write methods take a start which is a byte offset in the underlying file.
190pub trait BasicStorage: Send {
191    /// Get the size of the underlying storage.
192    /// Note : this is valid initially and after a commit but is not defined after write is called.
193    fn size(&self) -> u64;
194
195    /// Read data.
196    fn read(&self, start: u64, data: &mut [u8]);
197
198    /// Write byte slice to storage.
199    fn write(&mut self, start: u64, data: &[u8]);
200
201    /// Write byte Vec.
202    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
203        let len = data.len();
204        let d = Arc::new(data);
205        self.write_data(start, d, 0, len);
206    }
207
208    /// Write Data slice.
209    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
210        self.write(start, &data[off..off + len]);
211    }
212
213    /// Finish write transaction, size is new size of underlying storage.
214    fn commit(&mut self, size: u64);
215
216    /// Write u64.
217    fn write_u64(&mut self, start: u64, value: u64) {
218        self.write(start, &value.to_le_bytes());
219    }
220
221    /// Read u64.
222    fn read_u64(&self, start: u64) -> u64 {
223        let mut bytes = [0; 8];
224        self.read(start, &mut bytes);
225        u64::from_le_bytes(bytes)
226    }
227
228    /// Clone. note: provided method panics.
229    fn clone(&self) -> Box<dyn Storage> {
230        panic!()
231    }
232
233    /// Wait until current writes are complete.
234    fn wait_complete(&self) {}
235}
236
237/// BasicStorage that can be shared between threads.
238pub trait Storage: BasicStorage + Sync {}
239
240/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
241#[derive(Default)]
242pub struct MemFile {
243    v: Arc<Mutex<Vec<u8>>>,
244}
245
246impl MemFile {
247    /// Get a new (boxed) MemFile.
248    pub fn new() -> Box<Self> {
249        Box::default()
250    }
251}
252
253impl Storage for MemFile {}
254
255impl BasicStorage for MemFile {
256    fn size(&self) -> u64 {
257        let v = self.v.lock().unwrap();
258        v.len() as u64
259    }
260
261    fn read(&self, off: u64, bytes: &mut [u8]) {
262        let off = off as usize;
263        let len = bytes.len();
264        let mut v = self.v.lock().unwrap();
265        if off + len > v.len() {
266            v.resize(off + len, 0);
267        }
268        bytes.copy_from_slice(&v[off..off + len]);
269    }
270
271    fn write(&mut self, off: u64, bytes: &[u8]) {
272        let off = off as usize;
273        let len = bytes.len();
274        let mut v = self.v.lock().unwrap();
275        if off + len > v.len() {
276            v.resize(off + len, 0);
277        }
278        v[off..off + len].copy_from_slice(bytes);
279    }
280
281    fn commit(&mut self, size: u64) {
282        let mut v = self.v.lock().unwrap();
283        v.resize(size as usize, 0);
284    }
285
286    fn clone(&self) -> Box<dyn Storage> {
287        Box::new(Self { v: self.v.clone() })
288    }
289}
290
291use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
292
293struct FileInner {
294    f: fs::File,
295}
296
297impl FileInner {
298    /// Construct from filename.
299    pub fn new(filename: &str) -> Self {
300        Self {
301            f: OpenOptions::new()
302                .read(true)
303                .write(true)
304                .create(true)
305                .truncate(false)
306                .open(filename)
307                .unwrap(),
308        }
309    }
310
311    fn size(&mut self) -> u64 {
312        self.f.seek(SeekFrom::End(0)).unwrap()
313    }
314
315    fn read(&mut self, off: u64, bytes: &mut [u8]) {
316        self.f.seek(SeekFrom::Start(off)).unwrap();
317        let _ = self.f.read(bytes).unwrap();
318    }
319
320    fn write(&mut self, off: u64, bytes: &[u8]) {
321        // The list of operating systems which auto-zero is likely more than this...research is todo.
322        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
323        {
324            let size = self.f.seek(SeekFrom::End(0)).unwrap();
325            if off > size {
326                self.f.set_len(off).unwrap();
327            }
328        }
329        self.f.seek(SeekFrom::Start(off)).unwrap();
330        let _ = self.f.write(bytes).unwrap();
331    }
332
333    fn commit(&mut self, size: u64) {
334        self.f.set_len(size).unwrap();
335        self.f.sync_all().unwrap();
336    }
337}
338
339/// Can be used for atomic upd file ( does not implement Sync ).
340pub struct FastFileStorage {
341    file: RefCell<FileInner>,
342}
343
344impl FastFileStorage {
345    /// Construct from filename.
346    pub fn new(filename: &str) -> Box<Self> {
347        Box::new(Self {
348            file: RefCell::new(FileInner::new(filename)),
349        })
350    }
351}
352
353impl BasicStorage for FastFileStorage {
354    fn size(&self) -> u64 {
355        self.file.borrow_mut().size()
356    }
357    fn read(&self, off: u64, bytes: &mut [u8]) {
358        self.file.borrow_mut().read(off, bytes);
359    }
360
361    fn write(&mut self, off: u64, bytes: &[u8]) {
362        self.file.borrow_mut().write(off, bytes);
363    }
364
365    fn commit(&mut self, size: u64) {
366        self.file.borrow_mut().commit(size);
367    }
368}
369
370/// Simple implementation of [Storage] using [`std::fs::File`].
371pub struct SimpleFileStorage {
372    file: Arc<Mutex<FileInner>>,
373}
374
375impl SimpleFileStorage {
376    /// Construct from filename.
377    pub fn new(filename: &str) -> Box<Self> {
378        Box::new(Self {
379            file: Arc::new(Mutex::new(FileInner::new(filename))),
380        })
381    }
382}
383
384impl Storage for SimpleFileStorage {}
385
386impl BasicStorage for SimpleFileStorage {
387    fn size(&self) -> u64 {
388        self.file.lock().unwrap().size()
389    }
390
391    fn read(&self, off: u64, bytes: &mut [u8]) {
392        self.file.lock().unwrap().read(off, bytes);
393    }
394
395    fn write(&mut self, off: u64, bytes: &[u8]) {
396        self.file.lock().unwrap().write(off, bytes);
397    }
398
399    fn commit(&mut self, size: u64) {
400        self.file.lock().unwrap().commit(size);
401    }
402
403    fn clone(&self) -> Box<dyn Storage> {
404        Box::new(Self {
405            file: self.file.clone(),
406        })
407    }
408}
409
410/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
411pub struct AnyFileStorage {
412    filename: String,
413    files: Arc<Mutex<Vec<FileInner>>>,
414}
415
416impl AnyFileStorage {
417    /// Create new.
418    pub fn new(filename: &str) -> Box<Self> {
419        Box::new(Self {
420            filename: filename.to_owned(),
421            files: Arc::new(Mutex::new(Vec::new())),
422        })
423    }
424
425    fn get_file(&self) -> FileInner {
426        match self.files.lock().unwrap().pop() {
427            Some(f) => f,
428            _ => FileInner::new(&self.filename),
429        }
430    }
431
432    fn put_file(&self, f: FileInner) {
433        self.files.lock().unwrap().push(f);
434    }
435}
436
437impl Storage for AnyFileStorage {}
438
439impl BasicStorage for AnyFileStorage {
440    fn size(&self) -> u64 {
441        let mut f = self.get_file();
442        let result = f.size();
443        self.put_file(f);
444        result
445    }
446
447    fn read(&self, off: u64, bytes: &mut [u8]) {
448        let mut f = self.get_file();
449        f.read(off, bytes);
450        self.put_file(f);
451    }
452
453    fn write(&mut self, off: u64, bytes: &[u8]) {
454        let mut f = self.get_file();
455        f.write(off, bytes);
456        self.put_file(f);
457    }
458
459    fn commit(&mut self, size: u64) {
460        let mut f = self.get_file();
461        f.commit(size);
462        self.put_file(f);
463    }
464
465    fn clone(&self) -> Box<dyn Storage> {
466        Box::new(Self {
467            filename: self.filename.clone(),
468            files: self.files.clone(),
469        })
470    }
471}
472
473/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
474pub struct DummyFile {}
475impl DummyFile {
476    /// Construct.
477    pub fn new() -> Box<Self> {
478        Box::new(Self {})
479    }
480}
481
482impl Storage for DummyFile {}
483
484impl BasicStorage for DummyFile {
485    fn size(&self) -> u64 {
486        0
487    }
488
489    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
490
491    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
492
493    fn commit(&mut self, _size: u64) {}
494
495    fn clone(&self) -> Box<dyn Storage> {
496        Self::new()
497    }
498}
499
500/// Memory configuration limits.
501#[non_exhaustive]
502pub struct Limits {
503    /// Limit on size of commit write map, default is 5000.
504    pub map_lim: usize,
505    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
506    pub rbuf_mem: usize,
507    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
508    pub swbuf: usize,
509    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
510    pub uwbuf: usize,
511}
512
513impl Default for Limits {
514    fn default() -> Self {
515        Self {
516            map_lim: 5000,
517            rbuf_mem: 0x200000,
518            swbuf: 0x100000,
519            uwbuf: 0x100000,
520        }
521    }
522}
523
524/// Write Buffer.
525struct WriteBuffer {
526    /// Current write index into buf.
527    ix: usize,
528    /// Current file position.
529    pos: u64,
530    /// Underlying storage.
531    pub stg: Box<dyn BasicStorage>,
532    /// Buffer.
533    buf: Vec<u8>,
534}
535
536impl WriteBuffer {
537    /// Construct.
538    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
539        Self {
540            ix: 0,
541            pos: u64::MAX,
542            stg,
543            buf: vec![0; buf_size],
544        }
545    }
546
547    /// Write data to specified offset,
548    pub fn write(&mut self, off: u64, data: &[u8]) {
549        if self.pos + self.ix as u64 != off {
550            self.flush(off);
551        }
552        let mut done: usize = 0;
553        let mut todo: usize = data.len();
554        while todo > 0 {
555            let mut n: usize = self.buf.len() - self.ix;
556            if n == 0 {
557                self.flush(off + done as u64);
558                n = self.buf.len();
559            }
560            if n > todo {
561                n = todo;
562            }
563            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
564            todo -= n;
565            done += n;
566            self.ix += n;
567        }
568    }
569
570    fn flush(&mut self, new_pos: u64) {
571        if self.ix > 0 {
572            self.stg.write(self.pos, &self.buf[0..self.ix]);
573        }
574        self.ix = 0;
575        self.pos = new_pos;
576    }
577
578    /// Commit.
579    pub fn commit(&mut self, size: u64) {
580        self.flush(u64::MAX);
581        self.stg.commit(size);
582    }
583
584    /// Write u64.
585    pub fn write_u64(&mut self, start: u64, value: u64) {
586        self.write(start, &value.to_le_bytes());
587    }
588}
589
590/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
591///
592/// See implementation of AtomicFile for how this is used in conjunction with WMap.
593///
594/// N is buffer size.
595struct ReadBufStg<const N: usize> {
596    /// Underlying storage.
597    stg: Box<dyn Storage>,
598    /// Buffers.
599    buf: Mutex<ReadBuffer<N>>,
600    /// Read size that is considered small.
601    limit: usize,
602}
603
604impl<const N: usize> Drop for ReadBufStg<N> {
605    fn drop(&mut self) {
606        self.reset();
607    }
608}
609
610impl<const N: usize> ReadBufStg<N> {
611    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
612    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
613        Self {
614            stg,
615            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
616            limit,
617        }
618    }
619
620    /// Clears the buffers.
621    fn reset(&mut self) {
622        self.buf.lock().unwrap().reset();
623    }
624}
625
626impl<const N: usize> BasicStorage for ReadBufStg<N> {
627    /// Read data from storage.
628    fn read(&self, start: u64, data: &mut [u8]) {
629        if data.len() <= self.limit {
630            self.buf.lock().unwrap().read(&*self.stg, start, data);
631        } else {
632            self.stg.read(start, data);
633        }
634    }
635
636    /// Panics.
637    fn size(&self) -> u64 {
638        panic!()
639    }
640
641    /// Panics.
642    fn write(&mut self, _start: u64, _data: &[u8]) {
643        panic!();
644    }
645
646    /// Panics.
647    fn commit(&mut self, _size: u64) {
648        panic!();
649    }
650}
651
652impl<const N: usize> Storage for ReadBufStg<N> {}
653
654struct ReadBuffer<const N: usize> {
655    /// Maps sector mumbers cached buffers.
656    map: HashMap<u64, Box<[u8; N]>>,
657    /// Maximum number of buffers.
658    max_buf: usize,
659}
660
661impl<const N: usize> ReadBuffer<N> {
662    fn new(max_buf: usize) -> Self {
663        Self {
664            map: HashMap::default(),
665            max_buf,
666        }
667    }
668
669    fn reset(&mut self) {
670        self.map.clear();
671    }
672
673    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
674        let mut done = 0;
675        while done < data.len() {
676            let off = off + done as u64;
677            let sector = off / N as u64;
678            let disp = (off % N as u64) as usize;
679            let amount = min(data.len() - done, N - disp);
680
681            let p = self.map.entry(sector).or_insert_with(|| {
682                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
683                stg.read(sector * N as u64, &mut *p);
684                p
685            });
686            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
687            done += amount;
688        }
689        if self.map.len() >= self.max_buf {
690            self.reset();
691        }
692    }
693}
694
695#[derive(Default)]
696/// Slice of Data to be written to storage.
697struct DataSlice {
698    /// Slice data.
699    pub data: Data,
700    /// Start of slice.
701    pub off: usize,
702    /// Length of slice.
703    pub len: usize,
704}
705
706impl DataSlice {
707    /// Get reference to the whole slice.
708    pub fn all(&self) -> &[u8] {
709        &self.data[self.off..self.off + self.len]
710    }
711    /// Get reference to part of slice.
712    pub fn part(&self, off: usize, len: usize) -> &[u8] {
713        &self.data[self.off + off..self.off + off + len]
714    }
715    /// Trim specified amount from start of slice.
716    pub fn trim(&mut self, trim: usize) {
717        self.off += trim;
718        self.len -= trim;
719    }
720    /// Take the data.
721    #[allow(dead_code)]
722    pub fn take(&mut self) -> Data {
723        std::mem::take(&mut self.data)
724    }
725}
726
727#[derive(Default)]
728/// Updateable store based on some underlying storage.
729struct WMap {
730    /// Map of writes. Key is the end of the slice.
731    map: BTreeMap<u64, DataSlice>,
732}
733
734impl WMap {
735    /// Is the map empty?
736    pub fn is_empty(&self) -> bool {
737        self.map.is_empty()
738    }
739
740    /// Number of key-value pairs in the map.
741    pub fn len(&self) -> usize {
742        self.map.len()
743    }
744
745    /// Take the map and convert it to a Vec.
746    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
747        let map = std::mem::take(&mut self.map);
748        let mut result = Vec::with_capacity(map.len());
749        for (end, v) in map {
750            let start = end - v.len as u64;
751            result.push((start, v));
752        }
753        result
754    }
755
756    /// Write the map into storage.
757    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
758        for (end, v) in self.map.iter() {
759            let start = end - v.len as u64;
760            stg.write_data(start, v.data.clone(), v.off, v.len);
761        }
762    }
763
764    #[cfg(not(feature = "pstd"))]
765    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
766    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
767        if len != 0 {
768            let (mut insert, mut remove) = (Vec::new(), Vec::new());
769            let end = start + len as u64;
770            for (ee, v) in self.map.range_mut(start + 1..) {
771                let ee = *ee;
772                let es = ee - v.len as u64; // Existing write Start.
773                if es >= end {
774                    // Existing write starts after end of new write, nothing to do.
775                    break;
776                } else if start <= es {
777                    if end < ee {
778                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
779                        v.trim((end - es) as usize);
780                        break;
781                    }
782                    // New write subsumes existing write entirely, remove existing write.
783                    remove.push(ee);
784                } else if end < ee {
785                    // New write starts in middle of existing write, ends before end of existing write,
786                    // put start of existing write in insert list, trim existing write.
787                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
788                    v.trim((end - es) as usize);
789                    break;
790                } else {
791                    // New write starts in middle of existing write, ends after existing write,
792                    // put start of existing write in insert list, remove existing write.
793                    insert.push((es, v.take(), v.off, (start - es) as usize));
794                    remove.push(ee);
795                }
796            }
797            for end in remove {
798                self.map.remove(&end);
799            }
800            for (start, data, off, len) in insert {
801                self.map
802                    .insert(start + len as u64, DataSlice { data, off, len });
803            }
804            self.map
805                .insert(start + len as u64, DataSlice { data, off, len });
806        }
807    }
808
809    #[cfg(feature = "pstd")]
810    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
811    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
812        if len != 0 {
813            let end = start + len as u64;
814            let mut c = self
815                .map
816                .lower_bound_mut(std::ops::Bound::Excluded(&start))
817                .with_mutable_key();
818            while let Some((eend, v)) = c.next() {
819                let ee = *eend;
820                let es = ee - v.len as u64; // Existing write Start.
821                if es >= end {
822                    // Existing write starts after end of new write, nothing to do.
823                    c.prev();
824                    break;
825                } else if start <= es {
826                    if end < ee {
827                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
828                        v.trim((end - es) as usize);
829                        c.prev();
830                        break;
831                    }
832                    // New write subsumes existing write entirely, remove existing write.
833                    c.remove_prev();
834                } else if end < ee {
835                    // New write starts in middle of existing write, ends before end of existing write,
836                    // trim existing write, insert start of existing write.
837                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
838                    v.trim((end - es) as usize);
839                    c.prev();
840                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
841                    break;
842                } else {
843                    // New write starts in middle of existing write, ends after existing write,
844                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
845                    v.len = (start - es) as usize;
846                    *eend = es + v.len as u64;
847                }
848            }
849            // Insert the new write.
850            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
851        }
852    }
853
854    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
855    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
856        let len = data.len();
857        if len != 0 {
858            let mut done = 0;
859            for (&end, v) in self.map.range(start + 1..) {
860                let es = end - v.len as u64; // Existing write Start.
861                let doff = start + done as u64;
862                if es > doff {
863                    // Read from underlying storage.
864                    let a = min(len - done, (es - doff) as usize);
865                    u.read(doff, &mut data[done..done + a]);
866                    done += a;
867                    if done == len {
868                        return;
869                    }
870                }
871                // Use existing write.
872                let skip = (start + done as u64 - es) as usize;
873                let a = min(len - done, v.len - skip);
874                data[done..done + a].copy_from_slice(v.part(skip, a));
875                done += a;
876                if done == len {
877                    return;
878                }
879            }
880            u.read(start + done as u64, &mut data[done..]);
881        }
882    }
883}
884
885/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
886pub struct BasicAtomicFile {
887    /// The main underlying storage.
888    stg: WriteBuffer,
889    /// Temporary storage for updates during commit.
890    upd: WriteBuffer,
891    /// Map of writes.
892    map: WMap,
893    /// List of writes.
894    list: Vec<(u64, DataSlice)>,
895    size: u64,
896}
897
898impl BasicAtomicFile {
899    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
900    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
901        let size = stg.size();
902        let mut result = Box::new(Self {
903            stg: WriteBuffer::new(stg, lim.swbuf),
904            upd: WriteBuffer::new(upd, lim.uwbuf),
905            map: WMap::default(),
906            list: Vec::new(),
907            size,
908        });
909        result.init();
910        result
911    }
912
913    /// Apply outstanding updates.
914    fn init(&mut self) {
915        let end = self.upd.stg.read_u64(0);
916        let size = self.upd.stg.read_u64(8);
917        if end == 0 {
918            return;
919        }
920        assert!(end == self.upd.stg.size());
921        let mut pos = 16;
922        while pos < end {
923            let start = self.upd.stg.read_u64(pos);
924            pos += 8;
925            let len = self.upd.stg.read_u64(pos);
926            pos += 8;
927            let mut buf = vec![0; len as usize];
928            self.upd.stg.read(pos, &mut buf);
929            pos += len;
930            self.stg.write(start, &buf);
931        }
932        self.stg.commit(size);
933        self.upd.commit(0);
934    }
935
936    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
937    pub fn commit_phase(&mut self, size: u64, phase: u8) {
938        if self.map.is_empty() && self.list.is_empty() {
939            return;
940        }
941        if phase == 1 {
942            self.list = self.map.convert_to_vec();
943
944            // Write the updates to upd.
945            // First set the end position to zero.
946            self.upd.write_u64(0, 0);
947            self.upd.write_u64(8, size);
948            self.upd.commit(16); // Not clear if this is necessary.
949
950            // Write the update records.
951            let mut stg_written = false;
952            let mut pos: u64 = 16;
953            for (start, v) in self.list.iter() {
954                let (start, len, data) = (*start, v.len as u64, v.all());
955                if start >= self.size {
956                    // Writes beyond current stg size can be written directly.
957                    stg_written = true;
958                    self.stg.write(start, data);
959                } else {
960                    self.upd.write_u64(pos, start);
961                    pos += 8;
962                    self.upd.write_u64(pos, len);
963                    pos += 8;
964                    self.upd.write(pos, data);
965                    pos += len;
966                }
967            }
968            if stg_written {
969                self.stg.commit(size);
970            }
971            self.upd.commit(pos); // Not clear if this is necessary.
972
973            // Set the end position.
974            self.upd.write_u64(0, pos);
975            self.upd.write_u64(8, size);
976            self.upd.commit(pos);
977        } else {
978            for (start, v) in self.list.iter() {
979                if *start < self.size {
980                    // Writes beyond current stg size have already been written.
981                    self.stg.write(*start, v.all());
982                }
983            }
984            self.list.clear();
985            self.stg.commit(size);
986            self.upd.commit(0);
987        }
988    }
989}
990
991impl BasicStorage for BasicAtomicFile {
992    fn commit(&mut self, size: u64) {
993        self.commit_phase(size, 1);
994        self.commit_phase(size, 2);
995        self.size = size;
996    }
997
998    fn size(&self) -> u64 {
999        self.size
1000    }
1001
1002    fn read(&self, start: u64, data: &mut [u8]) {
1003        self.map.read(start, data, &*self.stg.stg);
1004    }
1005
1006    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1007        self.map.write(start, data, off, len);
1008    }
1009
1010    fn write(&mut self, start: u64, data: &[u8]) {
1011        let len = data.len();
1012        let d = Arc::new(data.to_vec());
1013        self.write_data(start, d, 0, len);
1014    }
1015}
1016    
1017/// Optimized implementation of [Storage] ( unix only ). 
1018#[cfg(target_family = "unix")]
1019pub struct UnixFileStorage
1020{
1021    size: Arc<Mutex<u64>>,
1022    f: fs::File,
1023}
1024#[cfg(target_family = "unix")]
1025impl UnixFileStorage
1026{
1027    /// Construct from filename.
1028    pub fn new(filename: &str) -> Box<Self>
1029    {
1030        let mut f = OpenOptions::new()
1031                .read(true)
1032                .write(true)
1033                .create(true)
1034                .truncate(false)
1035                .open(filename)
1036                .unwrap();
1037        let size = f.seek(SeekFrom::End(0)).unwrap();
1038        let size = Arc::new(Mutex::new(size));
1039        Box::new(Self{ size, f })
1040    }
1041}
1042
1043#[cfg(target_family = "unix")]
1044impl Storage for UnixFileStorage {}
1045
1046#[cfg(target_family = "unix")]
1047use std::os::unix::fs::FileExt;
1048
1049#[cfg(target_family = "unix")]
1050impl BasicStorage for UnixFileStorage
1051{
1052    fn read(&self, start: u64, data: &mut [u8])
1053    {
1054        
1055        let _ = self.f.read_at(data, start );
1056    }
1057
1058    fn write(&mut self, start: u64, data: &[u8])
1059    {
1060        
1061        let _ = self.f.write_at(data, start );
1062    }
1063
1064    fn size(&self) -> u64 {
1065        *self.size.lock().unwrap()
1066    }
1067
1068    fn commit(&mut self, size: u64)
1069    {
1070        *self.size.lock().unwrap() = size;
1071        self.f.set_len(size).unwrap();
1072        self.f.sync_all().unwrap();
1073    }
1074
1075    fn clone(&self) -> Box<dyn Storage> {
1076        Box::new(Self {
1077            size: self.size.clone(),
1078            f: self.f.try_clone().unwrap()
1079        })
1080    }
1081}
1082
1083/// Optimized implementation of [Storage] ( windows only ). 
1084#[cfg(target_family = "windows")]
1085pub struct WindowsFileStorage
1086{
1087    size: Arc<Mutex<u64>>,
1088    f: fs::File,
1089}
1090#[cfg(target_family = "windows")]
1091impl WindowsFileStorage
1092{
1093    /// Construct from filename.
1094    pub fn new(filename: &str) -> Box<Self>
1095    {
1096        let mut f = OpenOptions::new()
1097                .read(true)
1098                .write(true)
1099                .create(true)
1100                .truncate(false)
1101                .open(filename)
1102                .unwrap();
1103        let size = f.seek(SeekFrom::End(0)).unwrap();
1104        let size = Arc::new(Mutex::new(size));
1105        Box::new(Self{ size, f })
1106    }
1107}
1108
1109#[cfg(target_family = "windows")]
1110impl Storage for WindowsFileStorage {}
1111
1112#[cfg(target_family = "windows")]
1113use std::os::windows::fs::FileExt;
1114
1115#[cfg(target_family = "windows")]
1116impl BasicStorage for WindowsFileStorage
1117{
1118    fn read(&self, start: u64, data: &mut [u8])
1119    {
1120        
1121        let _ = self.f.seek_read(data, start );
1122    }
1123
1124    fn write(&mut self, start: u64, data: &[u8])
1125    {
1126        
1127        let _ = self.f.seek_write(data, start );
1128    }
1129
1130    fn size(&self) -> u64 {
1131        *self.size.lock().unwrap()
1132    }
1133
1134    fn commit(&mut self, size: u64)
1135    {
1136        *self.size.lock().unwrap() = size;
1137        self.f.set_len(size).unwrap();
1138        self.f.sync_all().unwrap();
1139    }
1140
1141    fn clone(&self) -> Box<dyn Storage> {
1142        Box::new(Self {
1143            size: self.size.clone(),
1144            f: self.f.try_clone().unwrap()
1145        })
1146    }
1147}
1148
1149/// Optimised Storage
1150#[cfg(target_family = "windows")]
1151pub type MultiFileStorage = WindowsFileStorage;
1152
1153/// Optimised Storage
1154#[cfg(target_family = "unix")]
1155pub type MultiFileStorage = UnixFileStorage;
1156
1157/// Optimised Storage
1158#[cfg(not(any(target_family = "unix",target_family = "windows")))]
1159pub type MultiFileStorage = AnyFileStorage;
1160
1161#[cfg(test)]
1162/// Get amount of testing from environment variable TA.
1163fn test_amount() -> usize {
1164    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1165}
1166
1167#[test]
1168fn test_atomic_file() {
1169    use rand::Rng;
1170    /* Idea of test is to check AtomicFile and MemFile behave the same */
1171
1172    let ta = test_amount();
1173    println!(" Test amount={}", ta);
1174
1175    let mut rng = rand::thread_rng();
1176
1177    for _ in 0..100 {
1178        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1179        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1180        let mut s2 = MemFile::new();
1181
1182        for _ in 0..1000 * ta {
1183            let off: usize = rng.r#gen::<usize>() % 100;
1184            let mut len = 1 + rng.r#gen::<usize>() % 20;
1185            let w: bool = rng.r#gen();
1186            if w {
1187                let mut bytes = Vec::new();
1188                while len > 0 {
1189                    len -= 1;
1190                    let b: u8 = rng.r#gen::<u8>();
1191                    bytes.push(b);
1192                }
1193                s1.write(off as u64, &bytes);
1194                s2.write(off as u64, &bytes);
1195            } else {
1196                let mut b2 = vec![0; len];
1197                let mut b3 = vec![0; len];
1198                s1.read(off as u64, &mut b2);
1199                s2.read(off as u64, &mut b3);
1200                assert!(b2 == b3);
1201            }
1202            if rng.r#gen::<usize>() % 50 == 0 {
1203                s1.commit(200);
1204                s2.commit(200);
1205            }
1206        }
1207    }
1208}