Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10use std::cell::RefCell;
11
12#[cfg(feature = "pstd")]
13use pstd::collections::BTreeMap;
14#[cfg(not(feature = "pstd"))]
15use std::collections::BTreeMap;
16
17/// ```Arc<Vec<u8>>```
18pub type Data = Arc<Vec<u8>>;
19
20/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
21/// Performs commit asyncronously.
22///
23/// #Example
24///
25/// ```
26/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
27/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
28/// af.write( 0, &[1,2,3,4] );
29/// af.commit(4);
30/// af.wait_complete();
31/// ```
32///
33/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
34/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
35/// the updates to underlying storage have been applied.
36pub struct AtomicFile {
37    /// New updates are written here.
38    map: WMap,
39    /// Underlying file, with previous updates mapped.
40    cf: Arc<RwLock<CommitFile>>,
41    /// File size.
42    size: u64,
43    /// For sending update maps to be saved.
44    tx: std::sync::mpsc::Sender<(u64, WMap)>,
45    /// Held by update process while it is active.
46    busy: Arc<Mutex<()>>,
47    /// Limit on size of CommitFile map.
48    map_lim: usize,
49}
50
51impl AtomicFile {
52    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
53    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
54        Self::new_with_limits(stg, upd, &Limits::default())
55    }
56
57    /// Construct Atomic file with specified limits.
58    pub fn new_with_limits(
59        stg: Box<dyn Storage>,
60        upd: Box<dyn BasicStorage>,
61        lim: &Limits,
62    ) -> Box<Self> {
63        let size = stg.size();
64        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
65
66        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
67        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
68        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
69
70        // Start the thread which does save asyncronously.
71        let (cf1, busy1) = (cf.clone(), busy.clone());
72
73        std::thread::spawn(move || {
74            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
75            while let Ok((size, map)) = rx.recv() {
76                let _lock = busy1.lock();
77                baf.map = map;
78                baf.commit(size);
79                cf1.write().unwrap().done_one();
80            }
81        });
82        Box::new(Self {
83            map: WMap::default(),
84            cf,
85            size,
86            tx,
87            busy,
88            map_lim: lim.map_lim,
89        })
90    }
91}
92
93impl Storage for AtomicFile
94{
95   fn clone(&self) -> Box<dyn Storage>
96   {
97       panic!()
98   }
99}
100
101impl BasicStorage for AtomicFile {
102    fn commit(&mut self, size: u64) {
103        self.size = size;
104        if self.map.is_empty() {
105            return;
106        }
107        if self.cf.read().unwrap().map.len() > self.map_lim {
108            self.wait_complete();
109        }
110        let map = std::mem::take(&mut self.map);
111        let cf = &mut *self.cf.write().unwrap();
112        cf.todo += 1;
113        // Apply map of updates to CommitFile.
114        map.to_storage(cf);
115        // Send map of updates to thread to be written to underlying storage.
116        self.tx.send((size, map)).unwrap();
117    }
118
119    fn size(&self) -> u64 {
120        self.size
121    }
122
123    fn read(&self, start: u64, data: &mut [u8]) {
124        self.map.read(start, data, &*self.cf.read().unwrap());
125    }
126
127    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
128        self.map.write(start, data, off, len);
129    }
130
131    fn write(&mut self, start: u64, data: &[u8]) {
132        let len = data.len();
133        let d = Arc::new(data.to_vec());
134        self.write_data(start, d, 0, len);
135    }
136
137    fn wait_complete(&self) {
138        while self.cf.read().unwrap().todo != 0 {
139            let _x = self.busy.lock();
140        }
141    }
142}
143
144struct CommitFile {
145    /// Buffered underlying storage.
146    stg: ReadBufStg<256>,
147    /// Map of committed updates.
148    map: WMap,
149    /// Number of outstanding unsaved commits.
150    todo: usize,
151}
152
153impl CommitFile {
154    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
155        Self {
156            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
157            map: WMap::default(),
158            todo: 0,
159        }
160    }
161
162    fn done_one(&mut self) {
163        self.todo -= 1;
164        if self.todo == 0 {
165            self.map = WMap::default();
166            self.stg.reset();
167        }
168    }
169}
170
171impl BasicStorage for CommitFile {
172    fn commit(&mut self, _size: u64) {
173        panic!()
174    }
175
176    fn size(&self) -> u64 {
177        panic!()
178    }
179
180    fn read(&self, start: u64, data: &mut [u8]) {
181        self.map.read(start, data, &self.stg);
182    }
183
184    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
185        self.map.write(start, data, off, len);
186    }
187
188    fn write(&mut self, _start: u64, _data: &[u8]) {
189        panic!()
190    }
191}
192
193/// Storage interface - BasicStorage is some kind of "file" storage.
194///
195/// read and write methods take a start which is a byte offset in the underlying file.
196pub trait BasicStorage: Send {
197    /// Get the size of the underlying storage.
198    /// Note : this is valid initially and after a commit but is not defined after write is called.
199    fn size(&self) -> u64;
200
201    /// Read data.
202    fn read(&self, start: u64, data: &mut [u8]);
203
204    /// Write byte slice to storage.
205    fn write(&mut self, start: u64, data: &[u8]);
206
207    /// Write byte Vec.
208    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
209        let len = data.len();
210        let d = Arc::new(data);
211        self.write_data(start, d, 0, len);
212    }
213
214    /// Write Data slice.
215    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
216        self.write(start, &data[off..off + len]);
217    }
218
219    /// Finish write transaction, size is new size of underlying storage.
220    fn commit(&mut self, size: u64);
221
222    /// Write u64.
223    fn write_u64(&mut self, start: u64, value: u64) {
224        self.write(start, &value.to_le_bytes());
225    }
226
227    /// Read u64.
228    fn read_u64(&self, start: u64) -> u64 {
229        let mut bytes = [0; 8];
230        self.read(start, &mut bytes);
231        u64::from_le_bytes(bytes)
232    }
233
234    /// Wait until current writes are complete.
235    fn wait_complete(&self) {}
236}
237
238/// BasicStorage with Sync and clone.
239pub trait Storage: BasicStorage + Sync {
240    /// Clone.
241    fn clone(&self) -> Box<dyn Storage>;
242}
243
244/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
245#[derive(Default)]
246pub struct MemFile {
247    v: Arc<Mutex<Vec<u8>>>,
248}
249
250impl MemFile {
251    /// Get a new (boxed) MemFile.
252    pub fn new() -> Box<Self> {
253        Box::default()
254    }
255}
256
257impl Storage for MemFile {
258    fn clone(&self) -> Box<dyn Storage> {
259        Box::new(Self { v: self.v.clone() })
260    }
261}
262
263impl BasicStorage for MemFile {
264    fn size(&self) -> u64 {
265        let v = self.v.lock().unwrap();
266        v.len() as u64
267    }
268
269    fn read(&self, off: u64, bytes: &mut [u8]) {
270        let off = off as usize;
271        let len = bytes.len();
272        let mut v = self.v.lock().unwrap();
273        if off + len > v.len() {
274            v.resize(off + len, 0);
275        }
276        bytes.copy_from_slice(&v[off..off + len]);
277    }
278
279    fn write(&mut self, off: u64, bytes: &[u8]) {
280        let off = off as usize;
281        let len = bytes.len();
282        let mut v = self.v.lock().unwrap();
283        if off + len > v.len() {
284            v.resize(off + len, 0);
285        }
286        v[off..off + len].copy_from_slice(bytes);
287    }
288
289    fn commit(&mut self, size: u64) {
290        let mut v = self.v.lock().unwrap();
291        v.resize(size as usize, 0);
292    }
293}
294
295use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
296
297struct FileInner {
298    f: fs::File,
299}
300
301impl FileInner {
302    /// Construct from filename.
303    pub fn new(filename: &str) -> Self {
304        Self {
305            f: OpenOptions::new()
306                .read(true)
307                .write(true)
308                .create(true)
309                .truncate(false)
310                .open(filename)
311                .unwrap(),
312        }
313    }
314
315    fn size(&mut self) -> u64 {
316        self.f.seek(SeekFrom::End(0)).unwrap()
317    }
318
319    fn read(&mut self, off: u64, bytes: &mut [u8]) {
320        self.f.seek(SeekFrom::Start(off)).unwrap();
321        let _ = self.f.read(bytes).unwrap();
322    }
323
324    fn write(&mut self, off: u64, bytes: &[u8]) {
325        // The list of operating systems which auto-zero is likely more than this...research is todo.
326        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
327        {
328            let size = self.f.seek(SeekFrom::End(0)).unwrap();
329            if off > size {
330                self.f.set_len(off).unwrap();
331            }
332        }
333        self.f.seek(SeekFrom::Start(off)).unwrap();
334        let _ = self.f.write(bytes).unwrap();
335    }
336
337    fn commit(&mut self, size: u64) {
338        self.f.set_len(size).unwrap();
339        self.f.sync_all().unwrap();
340    }
341}
342
343/// Can be used for atomic upd file ( does not implement Sync ).
344pub struct FastFileStorage {
345    file: RefCell<FileInner>,
346}
347
348impl FastFileStorage {
349    /// Construct from filename.
350    pub fn new(filename: &str) -> Box<Self> {
351        Box::new(Self {
352            file: RefCell::new(FileInner::new(filename)),
353        })
354    }
355}
356
357impl BasicStorage for FastFileStorage {
358    fn size(&self) -> u64 {
359        self.file.borrow_mut().size()
360    }
361    fn read(&self, off: u64, bytes: &mut [u8]) {
362        self.file.borrow_mut().read(off, bytes);
363    }
364
365    fn write(&mut self, off: u64, bytes: &[u8]) {
366        self.file.borrow_mut().write(off, bytes);
367    }
368
369    fn commit(&mut self, size: u64) {
370        self.file.borrow_mut().commit(size);
371    }
372}
373
374/// Simple implementation of [Storage] using [`std::fs::File`].
375pub struct SimpleFileStorage {
376    file: Arc<Mutex<FileInner>>,
377}
378
379impl SimpleFileStorage {
380    /// Construct from filename.
381    pub fn new(filename: &str) -> Box<Self> {
382        Box::new(Self {
383            file: Arc::new(Mutex::new(FileInner::new(filename))),
384        })
385    }
386}
387
388impl Storage for SimpleFileStorage {
389    fn clone(&self) -> Box<dyn Storage> {
390        Box::new(Self {
391            file: self.file.clone(),
392        })
393    }
394}
395
396impl BasicStorage for SimpleFileStorage {
397    fn size(&self) -> u64 {
398        self.file.lock().unwrap().size()
399    }
400
401    fn read(&self, off: u64, bytes: &mut [u8]) {
402        self.file.lock().unwrap().read(off, bytes);
403    }
404
405    fn write(&mut self, off: u64, bytes: &[u8]) {
406        self.file.lock().unwrap().write(off, bytes);
407    }
408
409    fn commit(&mut self, size: u64) {
410        self.file.lock().unwrap().commit(size);
411    }
412}
413
414/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
415pub struct AnyFileStorage {
416    filename: String,
417    files: Arc<Mutex<Vec<FileInner>>>,
418}
419
420impl AnyFileStorage {
421    /// Create new.
422    pub fn new(filename: &str) -> Box<Self> {
423        Box::new(Self {
424            filename: filename.to_owned(),
425            files: Arc::new(Mutex::new(Vec::new())),
426        })
427    }
428
429    fn get_file(&self) -> FileInner {
430        match self.files.lock().unwrap().pop() {
431            Some(f) => f,
432            _ => FileInner::new(&self.filename),
433        }
434    }
435
436    fn put_file(&self, f: FileInner) {
437        self.files.lock().unwrap().push(f);
438    }
439}
440
441impl Storage for AnyFileStorage {
442     fn clone(&self) -> Box<dyn Storage> {
443        Box::new(Self {
444            filename: self.filename.clone(),
445            files: self.files.clone(),
446        })
447    }
448}
449
450impl BasicStorage for AnyFileStorage {
451    fn size(&self) -> u64 {
452        let mut f = self.get_file();
453        let result = f.size();
454        self.put_file(f);
455        result
456    }
457
458    fn read(&self, off: u64, bytes: &mut [u8]) {
459        let mut f = self.get_file();
460        f.read(off, bytes);
461        self.put_file(f);
462    }
463
464    fn write(&mut self, off: u64, bytes: &[u8]) {
465        let mut f = self.get_file();
466        f.write(off, bytes);
467        self.put_file(f);
468    }
469
470    fn commit(&mut self, size: u64) {
471        let mut f = self.get_file();
472        f.commit(size);
473        self.put_file(f);
474    }
475}
476
477/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
478pub struct DummyFile {}
479impl DummyFile {
480    /// Construct.
481    pub fn new() -> Box<Self> {
482        Box::new(Self {})
483    }
484}
485
486impl Storage for DummyFile {
487    fn clone(&self) -> Box<dyn Storage> {
488        Self::new()
489    }
490}
491
492impl BasicStorage for DummyFile {
493    fn size(&self) -> u64 {
494        0
495    }
496
497    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
498
499    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
500
501    fn commit(&mut self, _size: u64) {}
502}
503
504/// Memory configuration limits.
505#[non_exhaustive]
506pub struct Limits {
507    /// Limit on size of commit write map, default is 5000.
508    pub map_lim: usize,
509    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
510    pub rbuf_mem: usize,
511    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
512    pub swbuf: usize,
513    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
514    pub uwbuf: usize,
515}
516
517impl Default for Limits {
518    fn default() -> Self {
519        Self {
520            map_lim: 5000,
521            rbuf_mem: 0x200000,
522            swbuf: 0x100000,
523            uwbuf: 0x100000,
524        }
525    }
526}
527
528/// Write Buffer.
529struct WriteBuffer {
530    /// Current write index into buf.
531    ix: usize,
532    /// Current file position.
533    pos: u64,
534    /// Underlying storage.
535    pub stg: Box<dyn BasicStorage>,
536    /// Buffer.
537    buf: Vec<u8>,
538}
539
540impl WriteBuffer {
541    /// Construct.
542    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
543        Self {
544            ix: 0,
545            pos: u64::MAX,
546            stg,
547            buf: vec![0; buf_size],
548        }
549    }
550
551    /// Write data to specified offset,
552    pub fn write(&mut self, off: u64, data: &[u8]) {
553        if self.pos + self.ix as u64 != off {
554            self.flush(off);
555        }
556        let mut done: usize = 0;
557        let mut todo: usize = data.len();
558        while todo > 0 {
559            let mut n: usize = self.buf.len() - self.ix;
560            if n == 0 {
561                self.flush(off + done as u64);
562                n = self.buf.len();
563            }
564            if n > todo {
565                n = todo;
566            }
567            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
568            todo -= n;
569            done += n;
570            self.ix += n;
571        }
572    }
573
574    fn flush(&mut self, new_pos: u64) {
575        if self.ix > 0 {
576            self.stg.write(self.pos, &self.buf[0..self.ix]);
577        }
578        self.ix = 0;
579        self.pos = new_pos;
580    }
581
582    /// Commit.
583    pub fn commit(&mut self, size: u64) {
584        self.flush(u64::MAX);
585        self.stg.commit(size);
586    }
587
588    /// Write u64.
589    pub fn write_u64(&mut self, start: u64, value: u64) {
590        self.write(start, &value.to_le_bytes());
591    }
592}
593
594/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
595///
596/// See implementation of AtomicFile for how this is used in conjunction with WMap.
597///
598/// N is buffer size.
599struct ReadBufStg<const N: usize> {
600    /// Underlying storage.
601    stg: Box<dyn Storage>,
602    /// Buffers.
603    buf: Mutex<ReadBuffer<N>>,
604    /// Read size that is considered small.
605    limit: usize,
606}
607
608impl<const N: usize> Drop for ReadBufStg<N> {
609    fn drop(&mut self) {
610        self.reset();
611    }
612}
613
614impl<const N: usize> ReadBufStg<N> {
615    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
616    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
617        Self {
618            stg,
619            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
620            limit,
621        }
622    }
623
624    /// Clears the buffers.
625    fn reset(&mut self) {
626        self.buf.lock().unwrap().reset();
627    }
628}
629
630impl<const N: usize> BasicStorage for ReadBufStg<N> {
631    /// Read data from storage.
632    fn read(&self, start: u64, data: &mut [u8]) {
633        if data.len() <= self.limit {
634            self.buf.lock().unwrap().read(&*self.stg, start, data);
635        } else {
636            self.stg.read(start, data);
637        }
638    }
639
640    /// Panics.
641    fn size(&self) -> u64 {
642        panic!()
643    }
644
645    /// Panics.
646    fn write(&mut self, _start: u64, _data: &[u8]) {
647        panic!();
648    }
649
650    /// Panics.
651    fn commit(&mut self, _size: u64) {
652        panic!();
653    }
654}
655
656struct ReadBuffer<const N: usize> {
657    /// Maps sector mumbers cached buffers.
658    map: HashMap<u64, Box<[u8; N]>>,
659    /// Maximum number of buffers.
660    max_buf: usize,
661}
662
663impl<const N: usize> ReadBuffer<N> {
664    fn new(max_buf: usize) -> Self {
665        Self {
666            map: HashMap::default(),
667            max_buf,
668        }
669    }
670
671    fn reset(&mut self) {
672        self.map.clear();
673    }
674
675    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
676        let mut done = 0;
677        while done < data.len() {
678            let off = off + done as u64;
679            let sector = off / N as u64;
680            let disp = (off % N as u64) as usize;
681            let amount = min(data.len() - done, N - disp);
682
683            let p = self.map.entry(sector).or_insert_with(|| {
684                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
685                stg.read(sector * N as u64, &mut *p);
686                p
687            });
688            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
689            done += amount;
690        }
691        if self.map.len() >= self.max_buf {
692            self.reset();
693        }
694    }
695}
696
697#[derive(Default)]
698/// Slice of Data to be written to storage.
699struct DataSlice {
700    /// Slice data.
701    pub data: Data,
702    /// Start of slice.
703    pub off: usize,
704    /// Length of slice.
705    pub len: usize,
706}
707
708impl DataSlice {
709    /// Get reference to the whole slice.
710    pub fn all(&self) -> &[u8] {
711        &self.data[self.off..self.off + self.len]
712    }
713    /// Get reference to part of slice.
714    pub fn part(&self, off: usize, len: usize) -> &[u8] {
715        &self.data[self.off + off..self.off + off + len]
716    }
717    /// Trim specified amount from start of slice.
718    pub fn trim(&mut self, trim: usize) {
719        self.off += trim;
720        self.len -= trim;
721    }
722    /// Take the data.
723    #[allow(dead_code)]
724    pub fn take(&mut self) -> Data {
725        std::mem::take(&mut self.data)
726    }
727}
728
729#[derive(Default)]
730/// Updateable store based on some underlying storage.
731struct WMap {
732    /// Map of writes. Key is the end of the slice.
733    map: BTreeMap<u64, DataSlice>,
734}
735
736impl WMap {
737    /// Is the map empty?
738    pub fn is_empty(&self) -> bool {
739        self.map.is_empty()
740    }
741
742    /// Number of key-value pairs in the map.
743    pub fn len(&self) -> usize {
744        self.map.len()
745    }
746
747    /// Take the map and convert it to a Vec.
748    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
749        let map = std::mem::take(&mut self.map);
750        let mut result = Vec::with_capacity(map.len());
751        for (end, v) in map {
752            let start = end - v.len as u64;
753            result.push((start, v));
754        }
755        result
756    }
757
758    /// Write the map into storage.
759    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
760        for (end, v) in self.map.iter() {
761            let start = end - v.len as u64;
762            stg.write_data(start, v.data.clone(), v.off, v.len);
763        }
764    }
765
766    #[cfg(not(feature = "pstd"))]
767    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
768    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
769        if len != 0 {
770            let (mut insert, mut remove) = (Vec::new(), Vec::new());
771            let end = start + len as u64;
772            for (ee, v) in self.map.range_mut(start + 1..) {
773                let ee = *ee;
774                let es = ee - v.len as u64; // Existing write Start.
775                if es >= end {
776                    // Existing write starts after end of new write, nothing to do.
777                    break;
778                } else if start <= es {
779                    if end < ee {
780                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
781                        v.trim((end - es) as usize);
782                        break;
783                    }
784                    // New write subsumes existing write entirely, remove existing write.
785                    remove.push(ee);
786                } else if end < ee {
787                    // New write starts in middle of existing write, ends before end of existing write,
788                    // put start of existing write in insert list, trim existing write.
789                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
790                    v.trim((end - es) as usize);
791                    break;
792                } else {
793                    // New write starts in middle of existing write, ends after existing write,
794                    // put start of existing write in insert list, remove existing write.
795                    insert.push((es, v.take(), v.off, (start - es) as usize));
796                    remove.push(ee);
797                }
798            }
799            for end in remove {
800                self.map.remove(&end);
801            }
802            for (start, data, off, len) in insert {
803                self.map
804                    .insert(start + len as u64, DataSlice { data, off, len });
805            }
806            self.map
807                .insert(start + len as u64, DataSlice { data, off, len });
808        }
809    }
810
811    #[cfg(feature = "pstd")]
812    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
813    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
814        if len != 0 {
815            let end = start + len as u64;
816            let mut c = self
817                .map
818                .lower_bound_mut(std::ops::Bound::Excluded(&start))
819                .with_mutable_key();
820            while let Some((eend, v)) = c.next() {
821                let ee = *eend;
822                let es = ee - v.len as u64; // Existing write Start.
823                if es >= end {
824                    // Existing write starts after end of new write, nothing to do.
825                    c.prev();
826                    break;
827                } else if start <= es {
828                    if end < ee {
829                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
830                        v.trim((end - es) as usize);
831                        c.prev();
832                        break;
833                    }
834                    // New write subsumes existing write entirely, remove existing write.
835                    c.remove_prev();
836                } else if end < ee {
837                    // New write starts in middle of existing write, ends before end of existing write,
838                    // trim existing write, insert start of existing write.
839                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
840                    v.trim((end - es) as usize);
841                    c.prev();
842                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
843                    break;
844                } else {
845                    // New write starts in middle of existing write, ends after existing write,
846                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
847                    v.len = (start - es) as usize;
848                    *eend = es + v.len as u64;
849                }
850            }
851            // Insert the new write.
852            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
853        }
854    }
855
856    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
857    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
858        let len = data.len();
859        if len != 0 {
860            let mut done = 0;
861            for (&end, v) in self.map.range(start + 1..) {
862                let es = end - v.len as u64; // Existing write Start.
863                let doff = start + done as u64;
864                if es > doff {
865                    // Read from underlying storage.
866                    let a = min(len - done, (es - doff) as usize);
867                    u.read(doff, &mut data[done..done + a]);
868                    done += a;
869                    if done == len {
870                        return;
871                    }
872                }
873                // Use existing write.
874                let skip = (start + done as u64 - es) as usize;
875                let a = min(len - done, v.len - skip);
876                data[done..done + a].copy_from_slice(v.part(skip, a));
877                done += a;
878                if done == len {
879                    return;
880                }
881            }
882            u.read(start + done as u64, &mut data[done..]);
883        }
884    }
885}
886
887/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
888pub struct BasicAtomicFile {
889    /// The main underlying storage.
890    stg: WriteBuffer,
891    /// Temporary storage for updates during commit.
892    upd: WriteBuffer,
893    /// Map of writes.
894    map: WMap,
895    /// List of writes.
896    list: Vec<(u64, DataSlice)>,
897    size: u64,
898}
899
900impl BasicAtomicFile {
901    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
902    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
903        let size = stg.size();
904        let mut result = Box::new(Self {
905            stg: WriteBuffer::new(stg, lim.swbuf),
906            upd: WriteBuffer::new(upd, lim.uwbuf),
907            map: WMap::default(),
908            list: Vec::new(),
909            size,
910        });
911        result.init();
912        result
913    }
914
915    /// Apply outstanding updates.
916    fn init(&mut self) {
917        let end = self.upd.stg.read_u64(0);
918        let size = self.upd.stg.read_u64(8);
919        if end == 0 {
920            return;
921        }
922        assert!(end == self.upd.stg.size());
923        let mut pos = 16;
924        while pos < end {
925            let start = self.upd.stg.read_u64(pos);
926            pos += 8;
927            let len = self.upd.stg.read_u64(pos);
928            pos += 8;
929            let mut buf = vec![0; len as usize];
930            self.upd.stg.read(pos, &mut buf);
931            pos += len;
932            self.stg.write(start, &buf);
933        }
934        self.stg.commit(size);
935        self.upd.commit(0);
936    }
937
938    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
939    pub fn commit_phase(&mut self, size: u64, phase: u8) {
940        if self.map.is_empty() && self.list.is_empty() {
941            return;
942        }
943        if phase == 1 {
944            self.list = self.map.convert_to_vec();
945
946            // Write the updates to upd.
947            // First set the end position to zero.
948            self.upd.write_u64(0, 0);
949            self.upd.write_u64(8, size);
950            self.upd.commit(16); // Not clear if this is necessary.
951
952            // Write the update records.
953            let mut stg_written = false;
954            let mut pos: u64 = 16;
955            for (start, v) in self.list.iter() {
956                let (start, len, data) = (*start, v.len as u64, v.all());
957                if start >= self.size {
958                    // Writes beyond current stg size can be written directly.
959                    stg_written = true;
960                    self.stg.write(start, data);
961                } else {
962                    self.upd.write_u64(pos, start);
963                    pos += 8;
964                    self.upd.write_u64(pos, len);
965                    pos += 8;
966                    self.upd.write(pos, data);
967                    pos += len;
968                }
969            }
970            if stg_written {
971                self.stg.commit(size);
972            }
973            self.upd.commit(pos); // Not clear if this is necessary.
974
975            // Set the end position.
976            self.upd.write_u64(0, pos);
977            self.upd.write_u64(8, size);
978            self.upd.commit(pos);
979        } else {
980            for (start, v) in self.list.iter() {
981                if *start < self.size {
982                    // Writes beyond current stg size have already been written.
983                    self.stg.write(*start, v.all());
984                }
985            }
986            self.list.clear();
987            self.stg.commit(size);
988            self.upd.commit(0);
989        }
990    }
991}
992
993impl BasicStorage for BasicAtomicFile {
994    fn commit(&mut self, size: u64) {
995        self.commit_phase(size, 1);
996        self.commit_phase(size, 2);
997        self.size = size;
998    }
999
1000    fn size(&self) -> u64 {
1001        self.size
1002    }
1003
1004    fn read(&self, start: u64, data: &mut [u8]) {
1005        self.map.read(start, data, &*self.stg.stg);
1006    }
1007
1008    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1009        self.map.write(start, data, off, len);
1010    }
1011
1012    fn write(&mut self, start: u64, data: &[u8]) {
1013        let len = data.len();
1014        let d = Arc::new(data.to_vec());
1015        self.write_data(start, d, 0, len);
1016    }
1017}
1018    
1019/// Optimized implementation of [Storage] ( unix only ). 
1020#[cfg(target_family = "unix")]
1021pub struct UnixFileStorage
1022{
1023    size: Arc<Mutex<u64>>,
1024    f: fs::File,
1025}
1026#[cfg(target_family = "unix")]
1027impl UnixFileStorage
1028{
1029    /// Construct from filename.
1030    pub fn new(filename: &str) -> Box<Self>
1031    {
1032        let mut f = OpenOptions::new()
1033                .read(true)
1034                .write(true)
1035                .create(true)
1036                .truncate(false)
1037                .open(filename)
1038                .unwrap();
1039        let size = f.seek(SeekFrom::End(0)).unwrap();
1040        let size = Arc::new(Mutex::new(size));
1041        Box::new(Self{ size, f })
1042    }
1043}
1044
1045#[cfg(target_family = "unix")]
1046impl Storage for UnixFileStorage {
1047    fn clone(&self) -> Box<dyn Storage> {
1048        Box::new(Self {
1049            size: self.size.clone(),
1050            f: self.f.try_clone().unwrap()
1051        })
1052    }
1053}
1054
1055#[cfg(target_family = "unix")]
1056use std::os::unix::fs::FileExt;
1057
1058#[cfg(target_family = "unix")]
1059impl BasicStorage for UnixFileStorage
1060{
1061    fn read(&self, start: u64, data: &mut [u8])
1062    {
1063        
1064        let _ = self.f.read_at(data, start );
1065    }
1066
1067    fn write(&mut self, start: u64, data: &[u8])
1068    {
1069        
1070        let _ = self.f.write_at(data, start );
1071    }
1072
1073    fn size(&self) -> u64 {
1074        *self.size.lock().unwrap()
1075    }
1076
1077    fn commit(&mut self, size: u64)
1078    {
1079        *self.size.lock().unwrap() = size;
1080        self.f.set_len(size).unwrap();
1081        self.f.sync_all().unwrap();
1082    }
1083}
1084
1085/// Optimized implementation of [Storage] ( windows only ). 
1086#[cfg(target_family = "windows")]
1087pub struct WindowsFileStorage
1088{
1089    size: Arc<Mutex<u64>>,
1090    f: fs::File,
1091}
1092#[cfg(target_family = "windows")]
1093impl WindowsFileStorage
1094{
1095    /// Construct from filename.
1096    pub fn new(filename: &str) -> Box<Self>
1097    {
1098        let mut f = OpenOptions::new()
1099                .read(true)
1100                .write(true)
1101                .create(true)
1102                .truncate(false)
1103                .open(filename)
1104                .unwrap();
1105        let size = f.seek(SeekFrom::End(0)).unwrap();
1106        let size = Arc::new(Mutex::new(size));
1107        Box::new(Self{ size, f })
1108    }
1109}
1110
1111#[cfg(target_family = "windows")]
1112impl Storage for WindowsFileStorage {
1113    fn clone(&self) -> Box<dyn Storage> {
1114        Box::new(Self {
1115            size: self.size.clone(),
1116            f: self.f.try_clone().unwrap()
1117        })
1118    }
1119}
1120
1121#[cfg(target_family = "windows")]
1122use std::os::windows::fs::FileExt;
1123
1124#[cfg(target_family = "windows")]
1125impl BasicStorage for WindowsFileStorage
1126{
1127    fn read(&self, start: u64, data: &mut [u8])
1128    {
1129        
1130        let _ = self.f.seek_read(data, start );
1131    }
1132
1133    fn write(&mut self, start: u64, data: &[u8])
1134    {
1135        
1136        let _ = self.f.seek_write(data, start );
1137    }
1138
1139    fn size(&self) -> u64 {
1140        *self.size.lock().unwrap()
1141    }
1142
1143    fn commit(&mut self, size: u64)
1144    {
1145        *self.size.lock().unwrap() = size;
1146        self.f.set_len(size).unwrap();
1147        self.f.sync_all().unwrap();
1148    }
1149}
1150
1151/// Optimised Storage
1152#[cfg(target_family = "windows")]
1153pub type MultiFileStorage = WindowsFileStorage;
1154
1155/// Optimised Storage
1156#[cfg(target_family = "unix")]
1157pub type MultiFileStorage = UnixFileStorage;
1158
1159/// Optimised Storage
1160#[cfg(not(any(target_family = "unix",target_family = "windows")))]
1161pub type MultiFileStorage = AnyFileStorage;
1162
1163#[cfg(test)]
1164/// Get amount of testing from environment variable TA.
1165fn test_amount() -> usize {
1166    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1167}
1168
1169#[test]
1170fn test_atomic_file() {
1171    use rand::Rng;
1172    /* Idea of test is to check AtomicFile and MemFile behave the same */
1173
1174    let ta = test_amount();
1175    println!(" Test amount={}", ta);
1176
1177    let mut rng = rand::thread_rng();
1178
1179    for _ in 0..100 {
1180        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1181        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1182        let mut s2 = MemFile::new();
1183
1184        for _ in 0..1000 * ta {
1185            let off: usize = rng.r#gen::<usize>() % 100;
1186            let mut len = 1 + rng.r#gen::<usize>() % 20;
1187            let w: bool = rng.r#gen();
1188            if w {
1189                let mut bytes = Vec::new();
1190                while len > 0 {
1191                    len -= 1;
1192                    let b: u8 = rng.r#gen::<u8>();
1193                    bytes.push(b);
1194                }
1195                s1.write(off as u64, &bytes);
1196                s2.write(off as u64, &bytes);
1197            } else {
1198                let mut b2 = vec![0; len];
1199                let mut b3 = vec![0; len];
1200                s1.read(off as u64, &mut b2);
1201                s2.read(off as u64, &mut b3);
1202                assert!(b2 == b3);
1203            }
1204            if rng.r#gen::<usize>() % 50 == 0 {
1205                s1.commit(200);
1206                s2.commit(200);
1207            }
1208        }
1209    }
1210}