Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10use std::cell::RefCell;
11
12#[cfg(feature = "pstd")]
13use pstd::collections::BTreeMap;
14#[cfg(not(feature = "pstd"))]
15use std::collections::BTreeMap;
16
17/// ```Arc<Vec<u8>>```
18pub type Data = Arc<Vec<u8>>;
19
20/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
21/// Performs commit asyncronously.
22///
23/// #Example
24///
25/// ```
26/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
27/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
28/// af.write( 0, &[1,2,3,4] );
29/// af.commit(4);
30/// af.wait_complete();
31/// ```
32///
33/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
34/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
35/// the updates to underlying storage have been applied.
36pub struct AtomicFile {
37    /// New updates are written here.
38    map: WMap,
39    /// Underlying file, with previous updates mapped.
40    cf: Arc<RwLock<CommitFile>>,
41    /// File size.
42    size: u64,
43    /// For sending update maps to be saved.
44    tx: std::sync::mpsc::Sender<(u64, WMap)>,
45    /// Held by update process while it is active.
46    busy: Arc<Mutex<()>>,
47    /// Limit on size of CommitFile map.
48    map_lim: usize,
49}
50
51impl AtomicFile {
52    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
53    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
54        Self::new_with_limits(stg, upd, &Limits::default())
55    }
56
57    /// Construct Atomic file with specified limits.
58    pub fn new_with_limits(
59        stg: Box<dyn Storage>,
60        upd: Box<dyn BasicStorage>,
61        lim: &Limits,
62    ) -> Box<Self> {
63        let size = stg.size();
64        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
65
66        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
67        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
68        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
69
70        // Start the thread which does save asyncronously.
71        let (cf1, busy1) = (cf.clone(), busy.clone());
72
73        std::thread::spawn(move || {
74            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
75            while let Ok((size, map)) = rx.recv() {
76                let _lock = busy1.lock();
77                baf.map = map;
78                baf.commit(size);
79                cf1.write().unwrap().done_one();
80            }
81        });
82        Box::new(Self {
83            map: WMap::default(),
84            cf,
85            size,
86            tx,
87            busy,
88            map_lim: lim.map_lim,
89        })
90    }
91}
92
93impl BasicStorage for AtomicFile {
94    fn commit(&mut self, size: u64) {
95        self.size = size;
96        if self.map.is_empty() {
97            return;
98        }
99        if self.cf.read().unwrap().map.len() > self.map_lim {
100            self.wait_complete();
101        }
102        let map = std::mem::take(&mut self.map);
103        let cf = &mut *self.cf.write().unwrap();
104        cf.todo += 1;
105        // Apply map of updates to CommitFile.
106        map.to_storage(cf);
107        // Send map of updates to thread to be written to underlying storage.
108        self.tx.send((size, map)).unwrap();
109    }
110
111    fn size(&self) -> u64 {
112        self.size
113    }
114
115    fn read(&self, start: u64, data: &mut [u8]) {
116        self.map.read(start, data, &*self.cf.read().unwrap());
117    }
118
119    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
120        self.map.write(start, data, off, len);
121    }
122
123    fn write(&mut self, start: u64, data: &[u8]) {
124        let len = data.len();
125        let d = Arc::new(data.to_vec());
126        self.write_data(start, d, 0, len);
127    }
128
129    fn wait_complete(&self) {
130        while self.cf.read().unwrap().todo != 0 {
131            let _x = self.busy.lock();
132        }
133    }
134}
135
136struct CommitFile {
137    /// Buffered underlying storage.
138    stg: ReadBufStg<256>,
139    /// Map of committed updates.
140    map: WMap,
141    /// Number of outstanding unsaved commits.
142    todo: usize,
143}
144
145impl CommitFile {
146    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
147        Self {
148            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
149            map: WMap::default(),
150            todo: 0,
151        }
152    }
153
154    fn done_one(&mut self) {
155        self.todo -= 1;
156        if self.todo == 0 {
157            self.map = WMap::default();
158            self.stg.reset();
159        }
160    }
161}
162
163impl BasicStorage for CommitFile {
164    fn commit(&mut self, _size: u64) {
165        panic!()
166    }
167
168    fn size(&self) -> u64 {
169        panic!()
170    }
171
172    fn read(&self, start: u64, data: &mut [u8]) {
173        self.map.read(start, data, &self.stg);
174    }
175
176    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
177        self.map.write(start, data, off, len);
178    }
179
180    fn write(&mut self, _start: u64, _data: &[u8]) {
181        panic!()
182    }
183}
184
185/// Storage interface - BasicStorage is some kind of "file" storage.
186///
187/// read and write methods take a start which is a byte offset in the underlying file.
188pub trait BasicStorage: Send {
189    /// Get the size of the underlying storage.
190    /// Note : this is valid initially and after a commit but is not defined after write is called.
191    fn size(&self) -> u64;
192
193    /// Read data.
194    fn read(&self, start: u64, data: &mut [u8]);
195
196    /// Write byte slice to storage.
197    fn write(&mut self, start: u64, data: &[u8]);
198
199    /// Write byte Vec.
200    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
201        let len = data.len();
202        let d = Arc::new(data);
203        self.write_data(start, d, 0, len);
204    }
205
206    /// Write Data slice.
207    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
208        self.write(start, &data[off..off + len]);
209    }
210
211    /// Finish write transaction, size is new size of underlying storage.
212    fn commit(&mut self, size: u64);
213
214    /// Write u64.
215    fn write_u64(&mut self, start: u64, value: u64) {
216        self.write(start, &value.to_le_bytes());
217    }
218
219    /// Read u64.
220    fn read_u64(&self, start: u64) -> u64 {
221        let mut bytes = [0; 8];
222        self.read(start, &mut bytes);
223        u64::from_le_bytes(bytes)
224    }
225
226    /// Wait until current writes are complete.
227    fn wait_complete(&self) {}
228}
229
230/// BasicStorage that can be cloned and shared between threads.
231pub trait Storage: BasicStorage + Sync {
232    /// Clone.
233    fn clone(&self) -> Box<dyn Storage>;
234}
235
236/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
237#[derive(Default)]
238pub struct MemFile {
239    v: Arc<Mutex<Vec<u8>>>,
240}
241
242impl MemFile {
243    /// Get a new (boxed) MemFile.
244    pub fn new() -> Box<Self> {
245        Box::default()
246    }
247}
248
249impl Storage for MemFile {
250    fn clone(&self) -> Box<dyn Storage> {
251        Box::new(Self { v: self.v.clone() })
252    }
253}
254
255impl BasicStorage for MemFile {
256    fn size(&self) -> u64 {
257        let v = self.v.lock().unwrap();
258        v.len() as u64
259    }
260
261    fn read(&self, off: u64, bytes: &mut [u8]) {
262        let off = off as usize;
263        let len = bytes.len();
264        let mut v = self.v.lock().unwrap();
265        if off + len > v.len() {
266            v.resize(off + len, 0);
267        }
268        bytes.copy_from_slice(&v[off..off + len]);
269    }
270
271    fn write(&mut self, off: u64, bytes: &[u8]) {
272        let off = off as usize;
273        let len = bytes.len();
274        let mut v = self.v.lock().unwrap();
275        if off + len > v.len() {
276            v.resize(off + len, 0);
277        }
278        v[off..off + len].copy_from_slice(bytes);
279    }
280
281    fn commit(&mut self, size: u64) {
282        let mut v = self.v.lock().unwrap();
283        v.resize(size as usize, 0);
284    }
285}
286
287use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
288
289struct FileInner {
290    f: fs::File,
291}
292
293impl FileInner {
294    /// Construct from filename.
295    pub fn new(filename: &str) -> Self {
296        Self {
297            f: OpenOptions::new()
298                .read(true)
299                .write(true)
300                .create(true)
301                .truncate(false)
302                .open(filename)
303                .unwrap(),
304        }
305    }
306
307    fn size(&mut self) -> u64 {
308        self.f.seek(SeekFrom::End(0)).unwrap()
309    }
310
311    fn read(&mut self, off: u64, bytes: &mut [u8]) {
312        self.f.seek(SeekFrom::Start(off)).unwrap();
313        let _ = self.f.read(bytes).unwrap();
314    }
315
316    fn write(&mut self, off: u64, bytes: &[u8]) {
317        // The list of operating systems which auto-zero is likely more than this...research is todo.
318        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
319        {
320            let size = self.f.seek(SeekFrom::End(0)).unwrap();
321            if off > size {
322                self.f.set_len(off).unwrap();
323            }
324        }
325        self.f.seek(SeekFrom::Start(off)).unwrap();
326        let _ = self.f.write(bytes).unwrap();
327    }
328
329    fn commit(&mut self, size: u64) {
330        self.f.set_len(size).unwrap();
331        self.f.sync_all().unwrap();
332    }
333}
334
335/// Can be used for atomic upd file ( does not implement Sync ).
336pub struct FastFileStorage {
337    file: RefCell<FileInner>,
338}
339
340impl FastFileStorage {
341    /// Construct from filename.
342    pub fn new(filename: &str) -> Box<Self> {
343        Box::new(Self {
344            file: RefCell::new(FileInner::new(filename)),
345        })
346    }
347}
348
349impl BasicStorage for FastFileStorage {
350    fn size(&self) -> u64 {
351        self.file.borrow_mut().size()
352    }
353    fn read(&self, off: u64, bytes: &mut [u8]) {
354        self.file.borrow_mut().read(off, bytes);
355    }
356
357    fn write(&mut self, off: u64, bytes: &[u8]) {
358        self.file.borrow_mut().write(off, bytes);
359    }
360
361    fn commit(&mut self, size: u64) {
362        self.file.borrow_mut().commit(size);
363    }
364}
365
366/// Simple implementation of [Storage] using [`std::fs::File`].
367pub struct SimpleFileStorage {
368    file: Arc<Mutex<FileInner>>,
369}
370
371impl SimpleFileStorage {
372    /// Construct from filename.
373    pub fn new(filename: &str) -> Box<Self> {
374        Box::new(Self {
375            file: Arc::new(Mutex::new(FileInner::new(filename))),
376        })
377    }
378}
379
380impl Storage for SimpleFileStorage {
381    fn clone(&self) -> Box<dyn Storage> {
382        Box::new(Self {
383            file: self.file.clone(),
384        })
385    }
386}
387
388impl BasicStorage for SimpleFileStorage {
389    fn size(&self) -> u64 {
390        self.file.lock().unwrap().size()
391    }
392
393    fn read(&self, off: u64, bytes: &mut [u8]) {
394        self.file.lock().unwrap().read(off, bytes);
395    }
396
397    fn write(&mut self, off: u64, bytes: &[u8]) {
398        self.file.lock().unwrap().write(off, bytes);
399    }
400
401    fn commit(&mut self, size: u64) {
402        self.file.lock().unwrap().commit(size);
403    }
404}
405
406/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
407pub struct AnyFileStorage {
408    filename: String,
409    files: Arc<Mutex<Vec<FileInner>>>,
410}
411
412impl AnyFileStorage {
413    /// Create new.
414    pub fn new(filename: &str) -> Box<Self> {
415        Box::new(Self {
416            filename: filename.to_owned(),
417            files: Arc::new(Mutex::new(Vec::new())),
418        })
419    }
420
421    fn get_file(&self) -> FileInner {
422        match self.files.lock().unwrap().pop() {
423            Some(f) => f,
424            _ => FileInner::new(&self.filename),
425        }
426    }
427
428    fn put_file(&self, f: FileInner) {
429        self.files.lock().unwrap().push(f);
430    }
431}
432
433impl Storage for AnyFileStorage {
434     fn clone(&self) -> Box<dyn Storage> {
435        Box::new(Self {
436            filename: self.filename.clone(),
437            files: self.files.clone(),
438        })
439    }
440}
441
442impl BasicStorage for AnyFileStorage {
443    fn size(&self) -> u64 {
444        let mut f = self.get_file();
445        let result = f.size();
446        self.put_file(f);
447        result
448    }
449
450    fn read(&self, off: u64, bytes: &mut [u8]) {
451        let mut f = self.get_file();
452        f.read(off, bytes);
453        self.put_file(f);
454    }
455
456    fn write(&mut self, off: u64, bytes: &[u8]) {
457        let mut f = self.get_file();
458        f.write(off, bytes);
459        self.put_file(f);
460    }
461
462    fn commit(&mut self, size: u64) {
463        let mut f = self.get_file();
464        f.commit(size);
465        self.put_file(f);
466    }
467}
468
469/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
470pub struct DummyFile {}
471impl DummyFile {
472    /// Construct.
473    pub fn new() -> Box<Self> {
474        Box::new(Self {})
475    }
476}
477
478impl Storage for DummyFile {
479    fn clone(&self) -> Box<dyn Storage> {
480        Self::new()
481    }
482}
483
484impl BasicStorage for DummyFile {
485    fn size(&self) -> u64 {
486        0
487    }
488
489    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
490
491    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
492
493    fn commit(&mut self, _size: u64) {}
494}
495
496/// Memory configuration limits.
497#[non_exhaustive]
498pub struct Limits {
499    /// Limit on size of commit write map, default is 5000.
500    pub map_lim: usize,
501    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
502    pub rbuf_mem: usize,
503    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
504    pub swbuf: usize,
505    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
506    pub uwbuf: usize,
507}
508
509impl Default for Limits {
510    fn default() -> Self {
511        Self {
512            map_lim: 5000,
513            rbuf_mem: 0x200000,
514            swbuf: 0x100000,
515            uwbuf: 0x100000,
516        }
517    }
518}
519
520/// Write Buffer.
521struct WriteBuffer {
522    /// Current write index into buf.
523    ix: usize,
524    /// Current file position.
525    pos: u64,
526    /// Underlying storage.
527    pub stg: Box<dyn BasicStorage>,
528    /// Buffer.
529    buf: Vec<u8>,
530}
531
532impl WriteBuffer {
533    /// Construct.
534    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
535        Self {
536            ix: 0,
537            pos: u64::MAX,
538            stg,
539            buf: vec![0; buf_size],
540        }
541    }
542
543    /// Write data to specified offset,
544    pub fn write(&mut self, off: u64, data: &[u8]) {
545        if self.pos + self.ix as u64 != off {
546            self.flush(off);
547        }
548        let mut done: usize = 0;
549        let mut todo: usize = data.len();
550        while todo > 0 {
551            let mut n: usize = self.buf.len() - self.ix;
552            if n == 0 {
553                self.flush(off + done as u64);
554                n = self.buf.len();
555            }
556            if n > todo {
557                n = todo;
558            }
559            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
560            todo -= n;
561            done += n;
562            self.ix += n;
563        }
564    }
565
566    fn flush(&mut self, new_pos: u64) {
567        if self.ix > 0 {
568            self.stg.write(self.pos, &self.buf[0..self.ix]);
569        }
570        self.ix = 0;
571        self.pos = new_pos;
572    }
573
574    /// Commit.
575    pub fn commit(&mut self, size: u64) {
576        self.flush(u64::MAX);
577        self.stg.commit(size);
578    }
579
580    /// Write u64.
581    pub fn write_u64(&mut self, start: u64, value: u64) {
582        self.write(start, &value.to_le_bytes());
583    }
584}
585
586/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
587///
588/// See implementation of AtomicFile for how this is used in conjunction with WMap.
589///
590/// N is buffer size.
591struct ReadBufStg<const N: usize> {
592    /// Underlying storage.
593    stg: Box<dyn Storage>,
594    /// Buffers.
595    buf: Mutex<ReadBuffer<N>>,
596    /// Read size that is considered small.
597    limit: usize,
598}
599
600impl<const N: usize> Drop for ReadBufStg<N> {
601    fn drop(&mut self) {
602        self.reset();
603    }
604}
605
606impl<const N: usize> ReadBufStg<N> {
607    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
608    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
609        Self {
610            stg,
611            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
612            limit,
613        }
614    }
615
616    /// Clears the buffers.
617    fn reset(&mut self) {
618        self.buf.lock().unwrap().reset();
619    }
620}
621
622impl<const N: usize> BasicStorage for ReadBufStg<N> {
623    /// Read data from storage.
624    fn read(&self, start: u64, data: &mut [u8]) {
625        if data.len() <= self.limit {
626            self.buf.lock().unwrap().read(&*self.stg, start, data);
627        } else {
628            self.stg.read(start, data);
629        }
630    }
631
632    /// Panics.
633    fn size(&self) -> u64 {
634        panic!()
635    }
636
637    /// Panics.
638    fn write(&mut self, _start: u64, _data: &[u8]) {
639        panic!();
640    }
641
642    /// Panics.
643    fn commit(&mut self, _size: u64) {
644        panic!();
645    }
646}
647
648struct ReadBuffer<const N: usize> {
649    /// Maps sector mumbers cached buffers.
650    map: HashMap<u64, Box<[u8; N]>>,
651    /// Maximum number of buffers.
652    max_buf: usize,
653}
654
655impl<const N: usize> ReadBuffer<N> {
656    fn new(max_buf: usize) -> Self {
657        Self {
658            map: HashMap::default(),
659            max_buf,
660        }
661    }
662
663    fn reset(&mut self) {
664        self.map.clear();
665    }
666
667    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
668        let mut done = 0;
669        while done < data.len() {
670            let off = off + done as u64;
671            let sector = off / N as u64;
672            let disp = (off % N as u64) as usize;
673            let amount = min(data.len() - done, N - disp);
674
675            let p = self.map.entry(sector).or_insert_with(|| {
676                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
677                stg.read(sector * N as u64, &mut *p);
678                p
679            });
680            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
681            done += amount;
682        }
683        if self.map.len() >= self.max_buf {
684            self.reset();
685        }
686    }
687}
688
689#[derive(Default)]
690/// Slice of Data to be written to storage.
691struct DataSlice {
692    /// Slice data.
693    pub data: Data,
694    /// Start of slice.
695    pub off: usize,
696    /// Length of slice.
697    pub len: usize,
698}
699
700impl DataSlice {
701    /// Get reference to the whole slice.
702    pub fn all(&self) -> &[u8] {
703        &self.data[self.off..self.off + self.len]
704    }
705    /// Get reference to part of slice.
706    pub fn part(&self, off: usize, len: usize) -> &[u8] {
707        &self.data[self.off + off..self.off + off + len]
708    }
709    /// Trim specified amount from start of slice.
710    pub fn trim(&mut self, trim: usize) {
711        self.off += trim;
712        self.len -= trim;
713    }
714    /// Take the data.
715    #[allow(dead_code)]
716    pub fn take(&mut self) -> Data {
717        std::mem::take(&mut self.data)
718    }
719}
720
721#[derive(Default)]
722/// Updateable store based on some underlying storage.
723struct WMap {
724    /// Map of writes. Key is the end of the slice.
725    map: BTreeMap<u64, DataSlice>,
726}
727
728impl WMap {
729    /// Is the map empty?
730    pub fn is_empty(&self) -> bool {
731        self.map.is_empty()
732    }
733
734    /// Number of key-value pairs in the map.
735    pub fn len(&self) -> usize {
736        self.map.len()
737    }
738
739    /// Take the map and convert it to a Vec.
740    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
741        let map = std::mem::take(&mut self.map);
742        let mut result = Vec::with_capacity(map.len());
743        for (end, v) in map {
744            let start = end - v.len as u64;
745            result.push((start, v));
746        }
747        result
748    }
749
750    /// Write the map into storage.
751    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
752        for (end, v) in self.map.iter() {
753            let start = end - v.len as u64;
754            stg.write_data(start, v.data.clone(), v.off, v.len);
755        }
756    }
757
758    #[cfg(not(feature = "pstd"))]
759    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
760    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
761        if len != 0 {
762            let (mut insert, mut remove) = (Vec::new(), Vec::new());
763            let end = start + len as u64;
764            for (ee, v) in self.map.range_mut(start + 1..) {
765                let ee = *ee;
766                let es = ee - v.len as u64; // Existing write Start.
767                if es >= end {
768                    // Existing write starts after end of new write, nothing to do.
769                    break;
770                } else if start <= es {
771                    if end < ee {
772                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
773                        v.trim((end - es) as usize);
774                        break;
775                    }
776                    // New write subsumes existing write entirely, remove existing write.
777                    remove.push(ee);
778                } else if end < ee {
779                    // New write starts in middle of existing write, ends before end of existing write,
780                    // put start of existing write in insert list, trim existing write.
781                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
782                    v.trim((end - es) as usize);
783                    break;
784                } else {
785                    // New write starts in middle of existing write, ends after existing write,
786                    // put start of existing write in insert list, remove existing write.
787                    insert.push((es, v.take(), v.off, (start - es) as usize));
788                    remove.push(ee);
789                }
790            }
791            for end in remove {
792                self.map.remove(&end);
793            }
794            for (start, data, off, len) in insert {
795                self.map
796                    .insert(start + len as u64, DataSlice { data, off, len });
797            }
798            self.map
799                .insert(start + len as u64, DataSlice { data, off, len });
800        }
801    }
802
803    #[cfg(feature = "pstd")]
804    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
805    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
806        if len != 0 {
807            let end = start + len as u64;
808            let mut c = self
809                .map
810                .lower_bound_mut(std::ops::Bound::Excluded(&start))
811                .with_mutable_key();
812            while let Some((eend, v)) = c.next() {
813                let ee = *eend;
814                let es = ee - v.len as u64; // Existing write Start.
815                if es >= end {
816                    // Existing write starts after end of new write, nothing to do.
817                    c.prev();
818                    break;
819                } else if start <= es {
820                    if end < ee {
821                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
822                        v.trim((end - es) as usize);
823                        c.prev();
824                        break;
825                    }
826                    // New write subsumes existing write entirely, remove existing write.
827                    c.remove_prev();
828                } else if end < ee {
829                    // New write starts in middle of existing write, ends before end of existing write,
830                    // trim existing write, insert start of existing write.
831                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
832                    v.trim((end - es) as usize);
833                    c.prev();
834                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
835                    break;
836                } else {
837                    // New write starts in middle of existing write, ends after existing write,
838                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
839                    v.len = (start - es) as usize;
840                    *eend = es + v.len as u64;
841                }
842            }
843            // Insert the new write.
844            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
845        }
846    }
847
848    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
849    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
850        let len = data.len();
851        if len != 0 {
852            let mut done = 0;
853            for (&end, v) in self.map.range(start + 1..) {
854                let es = end - v.len as u64; // Existing write Start.
855                let doff = start + done as u64;
856                if es > doff {
857                    // Read from underlying storage.
858                    let a = min(len - done, (es - doff) as usize);
859                    u.read(doff, &mut data[done..done + a]);
860                    done += a;
861                    if done == len {
862                        return;
863                    }
864                }
865                // Use existing write.
866                let skip = (start + done as u64 - es) as usize;
867                let a = min(len - done, v.len - skip);
868                data[done..done + a].copy_from_slice(v.part(skip, a));
869                done += a;
870                if done == len {
871                    return;
872                }
873            }
874            u.read(start + done as u64, &mut data[done..]);
875        }
876    }
877}
878
879/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
880pub struct BasicAtomicFile {
881    /// The main underlying storage.
882    stg: WriteBuffer,
883    /// Temporary storage for updates during commit.
884    upd: WriteBuffer,
885    /// Map of writes.
886    map: WMap,
887    /// List of writes.
888    list: Vec<(u64, DataSlice)>,
889    size: u64,
890}
891
892impl BasicAtomicFile {
893    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
894    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
895        let size = stg.size();
896        let mut result = Box::new(Self {
897            stg: WriteBuffer::new(stg, lim.swbuf),
898            upd: WriteBuffer::new(upd, lim.uwbuf),
899            map: WMap::default(),
900            list: Vec::new(),
901            size,
902        });
903        result.init();
904        result
905    }
906
907    /// Apply outstanding updates.
908    fn init(&mut self) {
909        let end = self.upd.stg.read_u64(0);
910        let size = self.upd.stg.read_u64(8);
911        if end == 0 {
912            return;
913        }
914        assert!(end == self.upd.stg.size());
915        let mut pos = 16;
916        while pos < end {
917            let start = self.upd.stg.read_u64(pos);
918            pos += 8;
919            let len = self.upd.stg.read_u64(pos);
920            pos += 8;
921            let mut buf = vec![0; len as usize];
922            self.upd.stg.read(pos, &mut buf);
923            pos += len;
924            self.stg.write(start, &buf);
925        }
926        self.stg.commit(size);
927        self.upd.commit(0);
928    }
929
930    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
931    pub fn commit_phase(&mut self, size: u64, phase: u8) {
932        if self.map.is_empty() && self.list.is_empty() {
933            return;
934        }
935        if phase == 1 {
936            self.list = self.map.convert_to_vec();
937
938            // Write the updates to upd.
939            // First set the end position to zero.
940            self.upd.write_u64(0, 0);
941            self.upd.write_u64(8, size);
942            self.upd.commit(16); // Not clear if this is necessary.
943
944            // Write the update records.
945            let mut stg_written = false;
946            let mut pos: u64 = 16;
947            for (start, v) in self.list.iter() {
948                let (start, len, data) = (*start, v.len as u64, v.all());
949                if start >= self.size {
950                    // Writes beyond current stg size can be written directly.
951                    stg_written = true;
952                    self.stg.write(start, data);
953                } else {
954                    self.upd.write_u64(pos, start);
955                    pos += 8;
956                    self.upd.write_u64(pos, len);
957                    pos += 8;
958                    self.upd.write(pos, data);
959                    pos += len;
960                }
961            }
962            if stg_written {
963                self.stg.commit(size);
964            }
965            self.upd.commit(pos); // Not clear if this is necessary.
966
967            // Set the end position.
968            self.upd.write_u64(0, pos);
969            self.upd.write_u64(8, size);
970            self.upd.commit(pos);
971        } else {
972            for (start, v) in self.list.iter() {
973                if *start < self.size {
974                    // Writes beyond current stg size have already been written.
975                    self.stg.write(*start, v.all());
976                }
977            }
978            self.list.clear();
979            self.stg.commit(size);
980            self.upd.commit(0);
981        }
982    }
983}
984
985impl BasicStorage for BasicAtomicFile {
986    fn commit(&mut self, size: u64) {
987        self.commit_phase(size, 1);
988        self.commit_phase(size, 2);
989        self.size = size;
990    }
991
992    fn size(&self) -> u64 {
993        self.size
994    }
995
996    fn read(&self, start: u64, data: &mut [u8]) {
997        self.map.read(start, data, &*self.stg.stg);
998    }
999
1000    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1001        self.map.write(start, data, off, len);
1002    }
1003
1004    fn write(&mut self, start: u64, data: &[u8]) {
1005        let len = data.len();
1006        let d = Arc::new(data.to_vec());
1007        self.write_data(start, d, 0, len);
1008    }
1009}
1010    
1011/// Optimized implementation of [Storage] ( unix only ). 
1012#[cfg(target_family = "unix")]
1013pub struct UnixFileStorage
1014{
1015    size: Arc<Mutex<u64>>,
1016    f: fs::File,
1017}
1018#[cfg(target_family = "unix")]
1019impl UnixFileStorage
1020{
1021    /// Construct from filename.
1022    pub fn new(filename: &str) -> Box<Self>
1023    {
1024        let mut f = OpenOptions::new()
1025                .read(true)
1026                .write(true)
1027                .create(true)
1028                .truncate(false)
1029                .open(filename)
1030                .unwrap();
1031        let size = f.seek(SeekFrom::End(0)).unwrap();
1032        let size = Arc::new(Mutex::new(size));
1033        Box::new(Self{ size, f })
1034    }
1035}
1036
1037#[cfg(target_family = "unix")]
1038impl Storage for UnixFileStorage {
1039    fn clone(&self) -> Box<dyn Storage> {
1040        Box::new(Self {
1041            size: self.size.clone(),
1042            f: self.f.try_clone().unwrap()
1043        })
1044    }
1045}
1046
1047#[cfg(target_family = "unix")]
1048use std::os::unix::fs::FileExt;
1049
1050#[cfg(target_family = "unix")]
1051impl BasicStorage for UnixFileStorage
1052{
1053    fn read(&self, start: u64, data: &mut [u8])
1054    {
1055        
1056        let _ = self.f.read_at(data, start );
1057    }
1058
1059    fn write(&mut self, start: u64, data: &[u8])
1060    {
1061        
1062        let _ = self.f.write_at(data, start );
1063    }
1064
1065    fn size(&self) -> u64 {
1066        *self.size.lock().unwrap()
1067    }
1068
1069    fn commit(&mut self, size: u64)
1070    {
1071        *self.size.lock().unwrap() = size;
1072        self.f.set_len(size).unwrap();
1073        self.f.sync_all().unwrap();
1074    }
1075}
1076
1077/// Optimized implementation of [Storage] ( windows only ). 
1078#[cfg(target_family = "windows")]
1079pub struct WindowsFileStorage
1080{
1081    size: Arc<Mutex<u64>>,
1082    f: fs::File,
1083}
1084#[cfg(target_family = "windows")]
1085impl WindowsFileStorage
1086{
1087    /// Construct from filename.
1088    pub fn new(filename: &str) -> Box<Self>
1089    {
1090        let mut f = OpenOptions::new()
1091                .read(true)
1092                .write(true)
1093                .create(true)
1094                .truncate(false)
1095                .open(filename)
1096                .unwrap();
1097        let size = f.seek(SeekFrom::End(0)).unwrap();
1098        let size = Arc::new(Mutex::new(size));
1099        Box::new(Self{ size, f })
1100    }
1101}
1102
1103#[cfg(target_family = "windows")]
1104impl Storage for WindowsFileStorage {
1105    fn clone(&self) -> Box<dyn Storage> {
1106        Box::new(Self {
1107            size: self.size.clone(),
1108            f: self.f.try_clone().unwrap()
1109        })
1110    }
1111}
1112
1113#[cfg(target_family = "windows")]
1114use std::os::windows::fs::FileExt;
1115
1116#[cfg(target_family = "windows")]
1117impl BasicStorage for WindowsFileStorage
1118{
1119    fn read(&self, start: u64, data: &mut [u8])
1120    {
1121        
1122        let _ = self.f.seek_read(data, start );
1123    }
1124
1125    fn write(&mut self, start: u64, data: &[u8])
1126    {
1127        
1128        let _ = self.f.seek_write(data, start );
1129    }
1130
1131    fn size(&self) -> u64 {
1132        *self.size.lock().unwrap()
1133    }
1134
1135    fn commit(&mut self, size: u64)
1136    {
1137        *self.size.lock().unwrap() = size;
1138        self.f.set_len(size).unwrap();
1139        self.f.sync_all().unwrap();
1140    }
1141}
1142
1143/// Optimised Storage
1144#[cfg(target_family = "windows")]
1145pub type MultiFileStorage = WindowsFileStorage;
1146
1147/// Optimised Storage
1148#[cfg(target_family = "unix")]
1149pub type MultiFileStorage = UnixFileStorage;
1150
1151/// Optimised Storage
1152#[cfg(not(any(target_family = "unix",target_family = "windows")))]
1153pub type MultiFileStorage = AnyFileStorage;
1154
1155#[cfg(test)]
1156/// Get amount of testing from environment variable TA.
1157fn test_amount() -> usize {
1158    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1159}
1160
1161#[test]
1162fn test_atomic_file() {
1163    use rand::Rng;
1164    /* Idea of test is to check AtomicFile and MemFile behave the same */
1165
1166    let ta = test_amount();
1167    println!(" Test amount={}", ta);
1168
1169    let mut rng = rand::thread_rng();
1170
1171    for _ in 0..100 {
1172        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1173        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1174        let mut s2 = MemFile::new();
1175
1176        for _ in 0..1000 * ta {
1177            let off: usize = rng.r#gen::<usize>() % 100;
1178            let mut len = 1 + rng.r#gen::<usize>() % 20;
1179            let w: bool = rng.r#gen();
1180            if w {
1181                let mut bytes = Vec::new();
1182                while len > 0 {
1183                    len -= 1;
1184                    let b: u8 = rng.r#gen::<u8>();
1185                    bytes.push(b);
1186                }
1187                s1.write(off as u64, &bytes);
1188                s2.write(off as u64, &bytes);
1189            } else {
1190                let mut b2 = vec![0; len];
1191                let mut b3 = vec![0; len];
1192                s1.read(off as u64, &mut b2);
1193                s2.read(off as u64, &mut b3);
1194                assert!(b2 == b3);
1195            }
1196            if rng.r#gen::<usize>() % 50 == 0 {
1197                s1.commit(200);
1198                s2.commit(200);
1199            }
1200        }
1201    }
1202}