Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5use rustc_hash::FxHashMap as HashMap;
6use std::cmp::min;
7use std::sync::{Arc, Mutex, RwLock};
8
9#[cfg(feature = "pstd")]
10use pstd::collections::BTreeMap;
11#[cfg(not(feature = "pstd"))]
12use std::collections::BTreeMap;
13
14/// ```Arc<Vec<u8>>```
15pub type Data = Arc<Vec<u8>>;
16
17/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
18/// Performs commit asyncronously.
19///
20/// #Example
21///
22/// ```
23/// use atom_file::{AtomicFile,DummyFile,MemFile,Storage};
24/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
25/// af.write( 0, &[1,2,3,4] );
26/// af.commit(4);
27/// af.wait_complete();
28/// ```
29///
30/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
31/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
32/// the updates to underlying storage have been applied.
33pub struct AtomicFile {
34    /// New updates are written here.
35    map: WMap,
36    /// Underlying file, with previous updates mapped.
37    cf: Arc<RwLock<CommitFile>>,
38    /// File size.
39    size: u64,
40    /// For sending update maps to be saved.
41    tx: std::sync::mpsc::Sender<(u64, WMap)>,
42    /// Held by update process while it is active.
43    busy: Arc<Mutex<()>>,
44    /// Limit on size of CommitFile map.
45    map_lim: usize,
46}
47
48impl AtomicFile {
49    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
50    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
51        Self::new_with_limits(stg, upd, &Limits::default())
52    }
53
54    /// Construct Atomic file with specified limits.
55    pub fn new_with_limits(
56        stg: Box<dyn Storage>,
57        upd: Box<dyn Storage>,
58        lim: &Limits,
59    ) -> Box<Self> {
60        let size = stg.size();
61        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
62        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
63        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
64        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
65
66        // Start the thread which does save asyncronously.
67        let (cf1, busy1) = (cf.clone(), busy.clone());
68        std::thread::spawn(move || {
69            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
70            while let Ok((size, map)) = rx.recv() {
71                let _lock = busy1.lock();
72                baf.map = map;
73                baf.commit(size);
74                cf1.write().unwrap().done_one();
75            }
76        });
77        Box::new(Self {
78            map: WMap::default(),
79            cf,
80            size,
81            tx,
82            busy,
83            map_lim: lim.map_lim,
84        })
85    }
86}
87
88impl Storage for AtomicFile {
89    fn commit(&mut self, size: u64) {
90        self.size = size;
91        if self.map.is_empty() {
92            return;
93        }
94        if self.cf.read().unwrap().map.len() > self.map_lim {
95            self.wait_complete();
96        }
97        let map = std::mem::take(&mut self.map);
98        let cf = &mut *self.cf.write().unwrap();
99        cf.todo += 1;
100        // Apply map of updates to CommitFile.
101        map.to_storage(cf);
102        // Send map of updates to thread to be written to underlying storage.
103        self.tx.send((size, map)).unwrap();
104    }
105
106    fn size(&self) -> u64 {
107        self.size
108    }
109
110    fn read(&self, start: u64, data: &mut [u8]) {
111        self.map.read(start, data, &*self.cf.read().unwrap());
112    }
113
114    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
115        self.map.write(start, data, off, len);
116    }
117
118    fn write(&mut self, start: u64, data: &[u8]) {
119        let len = data.len();
120        let d = Arc::new(data.to_vec());
121        self.write_data(start, d, 0, len);
122    }
123
124    fn wait_complete(&self) {
125        while self.cf.read().unwrap().todo != 0 {
126            let _x = self.busy.lock();
127        }
128    }
129}
130
131/// Storage interface - Storage is some kind of "file" storage.
132///
133/// read and write methods take a start which is a byte offset in the underlying file.
134pub trait Storage: Send + Sync {
135    /// Get the size of the underlying storage.
136    /// Note : this is valid initially and after a commit but is not defined after write is called.
137    fn size(&self) -> u64;
138
139    /// Read data.
140    fn read(&self, start: u64, data: &mut [u8]);
141
142    /// Write byte slice to storage.
143    fn write(&mut self, start: u64, data: &[u8]);
144
145    /// Write byte Vec.
146    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
147        let len = data.len();
148        let d = Arc::new(data);
149        self.write_data(start, d, 0, len);
150    }
151
152    /// Write Data slice.
153    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
154        self.write(start, &data[off..off + len]);
155    }
156
157    /// Finish write transaction, size is new size of underlying storage.
158    fn commit(&mut self, size: u64);
159
160    /// Write u64.
161    fn write_u64(&mut self, start: u64, value: u64) {
162        self.write(start, &value.to_le_bytes());
163    }
164
165    /// Read u64.
166    fn read_u64(&self, start: u64) -> u64 {
167        let mut bytes = [0; 8];
168        self.read(start, &mut bytes);
169        u64::from_le_bytes(bytes)
170    }
171
172    /// Clone. note: provided method panics.
173    fn clone(&self) -> Box<dyn Storage> {
174        panic!()
175    }
176
177    /// Wait until current writes are complete.
178    fn wait_complete(&self) {}
179}
180
181/// Simple implementation of [Storage] using `Vec<u8>`.
182#[derive(Default)]
183pub struct MemFile {
184    v: Arc<Mutex<Vec<u8>>>,
185}
186
187impl MemFile {
188    /// Get a new (boxed) MemFile.
189    pub fn new() -> Box<Self> {
190        Box::<Self>::default()
191    }
192}
193
194impl Storage for MemFile {
195    fn size(&self) -> u64 {
196        let v = self.v.lock().unwrap();
197        v.len() as u64
198    }
199
200    fn read(&self, off: u64, bytes: &mut [u8]) {
201        let off = off as usize;
202        let len = bytes.len();
203        let mut v = self.v.lock().unwrap();
204        if off + len > v.len() {
205            v.resize(off + len, 0);
206        }
207        bytes.copy_from_slice(&v[off..off + len]);
208    }
209
210    fn write(&mut self, off: u64, bytes: &[u8]) {
211        let off = off as usize;
212        let len = bytes.len();
213        let mut v = self.v.lock().unwrap();
214        if off + len > v.len() {
215            v.resize(off + len, 0);
216        }
217        v[off..off + len].copy_from_slice(bytes);
218    }
219
220    fn commit(&mut self, size: u64) {
221        let mut v = self.v.lock().unwrap();
222        v.resize(size as usize, 0);
223    }
224
225    fn clone(&self) -> Box<dyn Storage> {
226        Box::new(Self { v: self.v.clone() })
227    }
228}
229
230use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
231
232/// Simple implementation of [Storage] using `std::fs::File`.
233pub struct SimpleFileStorage {
234    file: Arc<Mutex<fs::File>>,
235}
236
237impl SimpleFileStorage {
238    /// Construct from filename.
239    pub fn new(filename: &str) -> Box<Self> {
240        Box::new(Self {
241            file: Arc::new(Mutex::new(
242                OpenOptions::new()
243                    .read(true)
244                    .write(true)
245                    .create(true)
246                    .truncate(false)
247                    .open(filename)
248                    .unwrap(),
249            )),
250        })
251    }
252}
253
254impl Storage for SimpleFileStorage {
255    fn size(&self) -> u64 {
256        let mut f = self.file.lock().unwrap();
257        f.seek(SeekFrom::End(0)).unwrap()
258    }
259
260    fn read(&self, off: u64, bytes: &mut [u8]) {
261        let mut f = self.file.lock().unwrap();
262        f.seek(SeekFrom::Start(off)).unwrap();
263        let _ = f.read(bytes).unwrap();
264    }
265
266    fn write(&mut self, off: u64, bytes: &[u8]) {
267        let mut f = self.file.lock().unwrap();
268        // The list of operating systems which auto-zero is likely more than this...research is todo.
269        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
270        {
271            let size = f.seek(SeekFrom::End(0)).unwrap();
272            if off > size {
273                f.set_len(off).unwrap();
274                /*
275                let len = (off - size) as usize;
276                let zb = vec![0; len];
277                f.seek(SeekFrom::Start(size)).unwrap();
278                let _ = f.write(&zb).unwrap();
279                */
280            }
281        }
282        f.seek(SeekFrom::Start(off)).unwrap();
283        let _ = f.write(bytes).unwrap();
284    }
285
286    fn commit(&mut self, size: u64) {
287        let f = self.file.lock().unwrap();
288        f.set_len(size).unwrap();
289        f.sync_all().unwrap();
290    }
291
292    fn clone(&self) -> Box<dyn Storage> {
293        Box::new(Self {
294            file: self.file.clone(),
295        })
296    }
297}
298
299/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads/writes by different threads.
300#[allow(clippy::vec_box)]
301pub struct MultiFileStorage {
302    filename: String,
303    files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
304}
305
306impl MultiFileStorage {
307    /// Create new MultiFileStorage.
308    pub fn new(filename: &str) -> Box<Self> {
309        Box::new(Self {
310            filename: filename.to_string(),
311            files: Arc::new(Mutex::new(Vec::new())),
312        })
313    }
314
315    fn get_file(&self) -> Box<SimpleFileStorage> {
316        match self.files.lock().unwrap().pop() {
317            Some(f) => f,
318            _ => SimpleFileStorage::new(&self.filename),
319        }
320    }
321
322    fn put_file(&self, f: Box<SimpleFileStorage>) {
323        self.files.lock().unwrap().push(f);
324    }
325}
326
327impl Storage for MultiFileStorage {
328    fn size(&self) -> u64 {
329        let f = self.get_file();
330        let result = f.size();
331        self.put_file(f);
332        result
333    }
334
335    fn read(&self, off: u64, bytes: &mut [u8]) {
336        let f = self.get_file();
337        f.read(off, bytes);
338        self.put_file(f);
339    }
340
341    fn write(&mut self, off: u64, bytes: &[u8]) {
342        let mut f = self.get_file();
343        f.write(off, bytes);
344        self.put_file(f);
345    }
346
347    fn commit(&mut self, size: u64) {
348        let mut f = self.get_file();
349        f.commit(size);
350        self.put_file(f);
351    }
352
353    fn clone(&self) -> Box<dyn Storage> {
354        Box::new(Self {
355            filename: self.filename.clone(),
356            files: self.files.clone(),
357        })
358    }
359}
360
361/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
362pub struct DummyFile {}
363impl DummyFile {
364    /// Construct.
365    pub fn new() -> Box<Self> {
366        Box::new(Self {})
367    }
368}
369
370impl Storage for DummyFile {
371    fn size(&self) -> u64 {
372        0
373    }
374
375    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
376
377    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
378
379    fn commit(&mut self, _size: u64) {}
380
381    fn clone(&self) -> Box<dyn Storage> {
382        Self::new()
383    }
384}
385
386/// Memory configuration limits.
387#[non_exhaustive]
388pub struct Limits {
389    /// Limit on size of commit write map, default is 5000.
390    pub map_lim: usize,
391    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
392    pub rbuf_mem: usize,
393    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
394    pub swbuf: usize,
395    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
396    pub uwbuf: usize,
397}
398
399impl Default for Limits {
400    fn default() -> Self {
401        Self {
402            map_lim: 5000,
403            rbuf_mem: 0x200000,
404            swbuf: 0x100000,
405            uwbuf: 0x100000,
406        }
407    }
408}
409
410struct CommitFile {
411    /// Buffered underlying storage.
412    stg: ReadBufStg<256>,
413    /// Map of committed updates.
414    map: WMap,
415    /// Number of outstanding unsaved commits.
416    todo: usize,
417}
418
419impl CommitFile {
420    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
421        Self {
422            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
423            map: WMap::default(),
424            todo: 0,
425        }
426    }
427
428    fn done_one(&mut self) {
429        self.todo -= 1;
430        if self.todo == 0 {
431            self.map = WMap::default();
432            self.stg.reset();
433        }
434    }
435}
436
437impl Storage for CommitFile {
438    fn commit(&mut self, _size: u64) {
439        panic!()
440    }
441
442    fn size(&self) -> u64 {
443        panic!()
444    }
445
446    fn read(&self, start: u64, data: &mut [u8]) {
447        self.map.read(start, data, &self.stg);
448    }
449
450    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
451        self.map.write(start, data, off, len);
452    }
453
454    fn write(&mut self, _start: u64, _data: &[u8]) {
455        panic!()
456    }
457}
458
459/// Write Buffer.
460struct WriteBuffer {
461    /// Current write index into buf.
462    ix: usize,
463    /// Current file position.
464    pos: u64,
465    /// Underlying storage.
466    pub stg: Box<dyn Storage>,
467    /// Buffer.
468    buf: Vec<u8>,
469}
470
471impl WriteBuffer {
472    /// Construct.
473    pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
474        Self {
475            ix: 0,
476            pos: u64::MAX,
477            stg,
478            buf: vec![0; buf_size],
479        }
480    }
481
482    /// Write data to specified offset,
483    pub fn write(&mut self, off: u64, data: &[u8]) {
484        if self.pos + self.ix as u64 != off {
485            self.flush(off);
486        }
487        let mut done: usize = 0;
488        let mut todo: usize = data.len();
489        while todo > 0 {
490            let mut n: usize = self.buf.len() - self.ix;
491            if n == 0 {
492                self.flush(off + done as u64);
493                n = self.buf.len();
494            }
495            if n > todo {
496                n = todo;
497            }
498            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
499            todo -= n;
500            done += n;
501            self.ix += n;
502        }
503    }
504
505    fn flush(&mut self, new_pos: u64) {
506        if self.ix > 0 {
507            self.stg.write(self.pos, &self.buf[0..self.ix]);
508        }
509        self.ix = 0;
510        self.pos = new_pos;
511    }
512
513    /// Commit.
514    pub fn commit(&mut self, size: u64) {
515        self.flush(u64::MAX);
516        self.stg.commit(size);
517    }
518
519    /// Write u64.
520    pub fn write_u64(&mut self, start: u64, value: u64) {
521        self.write(start, &value.to_le_bytes());
522    }
523}
524
525/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
526///
527/// See implementation of AtomicFile for how this is used in conjunction with WMap.
528///
529/// N is buffer size.
530struct ReadBufStg<const N: usize> {
531    /// Underlying storage.
532    stg: Box<dyn Storage>,
533    /// Buffers.
534    buf: Mutex<ReadBuffer<N>>,
535    /// Read size that is considered small.
536    limit: usize,
537}
538
539impl<const N: usize> Drop for ReadBufStg<N> {
540    fn drop(&mut self) {
541        self.reset();
542    }
543}
544
545impl<const N: usize> ReadBufStg<N> {
546    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
547    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
548        Self {
549            stg,
550            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
551            limit,
552        }
553    }
554
555    /// Clears the buffers.
556    fn reset(&mut self) {
557        self.buf.lock().unwrap().reset();
558    }
559}
560
561impl<const N: usize> Storage for ReadBufStg<N> {
562    /// Read data from storage.
563    fn read(&self, start: u64, data: &mut [u8]) {
564        if data.len() <= self.limit {
565            self.buf.lock().unwrap().read(&*self.stg, start, data);
566        } else {
567            self.stg.read(start, data);
568        }
569    }
570
571    /// Panics.
572    fn size(&self) -> u64 {
573        panic!()
574    }
575
576    /// Panics.
577    fn write(&mut self, _start: u64, _data: &[u8]) {
578        panic!();
579    }
580
581    /// Panics.
582    fn commit(&mut self, _size: u64) {
583        panic!();
584    }
585}
586
587struct ReadBuffer<const N: usize> {
588    /// Maps sector mumbers cached buffers.
589    map: HashMap<u64, Box<[u8; N]>>,
590    /// Maximum number of buffers.
591    max_buf: usize,
592}
593
594impl<const N: usize> ReadBuffer<N> {
595    fn new(max_buf: usize) -> Self {
596        Self {
597            map: HashMap::default(),
598            max_buf,
599        }
600    }
601
602    fn reset(&mut self) {
603        self.map.clear();
604    }
605
606    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
607        let mut done = 0;
608        while done < data.len() {
609            let off = off + done as u64;
610            let sector = off / N as u64;
611            let disp = (off % N as u64) as usize;
612            let amount = min(data.len() - done, N - disp);
613
614            let p = self.map.entry(sector).or_insert_with(|| {
615                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
616                stg.read(sector * N as u64, &mut *p);
617                p
618            });
619            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
620            done += amount;
621        }
622        if self.map.len() >= self.max_buf {
623            self.reset();
624        }
625    }
626}
627
628#[derive(Default)]
629/// Slice of Data to be written to storage.
630struct DataSlice {
631    /// Slice data.
632    pub data: Data,
633    /// Start of slice.
634    pub off: usize,
635    /// Length of slice.
636    pub len: usize,
637}
638
639impl DataSlice {
640    /// Get reference to the whole slice.
641    pub fn all(&self) -> &[u8] {
642        &self.data[self.off..self.off + self.len]
643    }
644    /// Get reference to part of slice.
645    pub fn part(&self, off: usize, len: usize) -> &[u8] {
646        &self.data[self.off + off..self.off + off + len]
647    }
648    /// Trim specified amount from start of slice.
649    pub fn trim(&mut self, trim: usize) {
650        self.off += trim;
651        self.len -= trim;
652    }
653    /// Take the data.
654    #[allow(dead_code)]
655    pub fn take(&mut self) -> Data {
656        std::mem::take(&mut self.data)
657    }
658}
659
660#[derive(Default)]
661/// Updateable storage based on some underlying storage.
662struct WMap {
663    /// Map of writes. Key is the end of the slice.
664    map: BTreeMap<u64, DataSlice>,
665}
666
667impl WMap {
668    /// Is the map empty?
669    pub fn is_empty(&self) -> bool {
670        self.map.is_empty()
671    }
672
673    /// Number of key-value pairs in the map.
674    pub fn len(&self) -> usize {
675        self.map.len()
676    }
677
678    /// Take the map and convert it to a Vec.
679    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
680        let map = std::mem::take(&mut self.map);
681        let mut result = Vec::with_capacity(map.len());
682        for (end, v) in map {
683            let start = end - v.len as u64;
684            result.push((start, v));
685        }
686        result
687    }
688
689    /// Write the map into storage.
690    pub fn to_storage(&self, stg: &mut dyn Storage) {
691        for (end, v) in self.map.iter() {
692            let start = end - v.len as u64;
693            stg.write_data(start, v.data.clone(), v.off, v.len);
694        }
695    }
696
697    #[cfg(not(feature = "pstd"))]
698    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
699    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
700        if len != 0 {
701            let (mut insert, mut remove) = (Vec::new(), Vec::new());
702            let end = start + len as u64;
703            for (ee, v) in self.map.range_mut(start + 1..) {
704                let ee = *ee;
705                let es = ee - v.len as u64; // Existing write Start.
706                if es >= end {
707                    // Existing write starts after end of new write, nothing to do.
708                    break;
709                } else if start <= es {
710                    if end < ee {
711                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
712                        v.trim((end - es) as usize);
713                        break;
714                    }
715                    // New write subsumes existing write entirely, remove existing write.
716                    remove.push(ee);
717                } else if end < ee {
718                    // New write starts in middle of existing write, ends before end of existing write,
719                    // put start of existing write in insert list, trim existing write.
720                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
721                    v.trim((end - es) as usize);
722                    break;
723                } else {
724                    // New write starts in middle of existing write, ends after existing write,
725                    // put start of existing write in insert list, remove existing write.
726                    insert.push((es, v.take(), v.off, (start - es) as usize));
727                    remove.push(ee);
728                }
729            }
730            for end in remove {
731                self.map.remove(&end);
732            }
733            for (start, data, off, len) in insert {
734                self.map
735                    .insert(start + len as u64, DataSlice { data, off, len });
736            }
737            self.map
738                .insert(start + len as u64, DataSlice { data, off, len });
739        }
740    }
741
742    #[cfg(feature = "pstd")]
743    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
744    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
745        if len != 0 {
746            let end = start + len as u64;
747            let mut c = self
748                .map
749                .lower_bound_mut(std::ops::Bound::Excluded(&start))
750                .with_mutable_key();
751            while let Some((eend, v)) = c.next() {
752                let ee = *eend;
753                let es = ee - v.len as u64; // Existing write Start.
754                if es >= end {
755                    // Existing write starts after end of new write, nothing to do.
756                    c.prev();
757                    break;
758                } else if start <= es {
759                    if end < ee {
760                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
761                        v.trim((end - es) as usize);
762                        c.prev();
763                        break;
764                    }
765                    // New write subsumes existing write entirely, remove existing write.
766                    c.remove_prev();
767                } else if end < ee {
768                    // New write starts in middle of existing write, ends before end of existing write,
769                    // trim existing write, insert start of existing write.
770                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
771                    v.trim((end - es) as usize);
772                    c.prev();
773                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
774                    break;
775                } else {
776                    // New write starts in middle of existing write, ends after existing write,
777                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
778                    v.len = (start - es) as usize;
779                    *eend = es + v.len as u64;
780                }
781            }
782            // Insert the new write.
783            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
784        }
785    }
786
787    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
788    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
789        let len = data.len();
790        if len != 0 {
791            let mut done = 0;
792            for (&end, v) in self.map.range(start + 1..) {
793                let es = end - v.len as u64; // Existing write Start.
794                let doff = start + done as u64;
795                if es > doff {
796                    // Read from underlying storage.
797                    let a = min(len - done, (es - doff) as usize);
798                    u.read(doff, &mut data[done..done + a]);
799                    done += a;
800                    if done == len {
801                        return;
802                    }
803                }
804                // Use existing write.
805                let skip = (start + done as u64 - es) as usize;
806                let a = min(len - done, v.len - skip);
807                data[done..done + a].copy_from_slice(v.part(skip, a));
808                done += a;
809                if done == len {
810                    return;
811                }
812            }
813            u.read(start + done as u64, &mut data[done..]);
814        }
815    }
816}
817
818/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
819pub struct BasicAtomicFile {
820    /// The main underlying storage.
821    stg: WriteBuffer,
822    /// Temporary storage for updates during commit.
823    upd: WriteBuffer,
824    /// Map of writes.
825    map: WMap,
826    /// List of writes.
827    list: Vec<(u64, DataSlice)>,
828    size: u64,
829}
830
831impl BasicAtomicFile {
832    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
833    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
834        let size = stg.size();
835        let mut result = Box::new(Self {
836            stg: WriteBuffer::new(stg, lim.swbuf),
837            upd: WriteBuffer::new(upd, lim.uwbuf),
838            map: WMap::default(),
839            list: Vec::new(),
840            size,
841        });
842        result.init();
843        result
844    }
845
846    /// Apply outstanding updates.
847    fn init(&mut self) {
848        let end = self.upd.stg.read_u64(0);
849        let size = self.upd.stg.read_u64(8);
850        if end == 0 {
851            return;
852        }
853        assert!(end == self.upd.stg.size());
854        let mut pos = 16;
855        while pos < end {
856            let start = self.upd.stg.read_u64(pos);
857            pos += 8;
858            let len = self.upd.stg.read_u64(pos);
859            pos += 8;
860            let mut buf = vec![0; len as usize];
861            self.upd.stg.read(pos, &mut buf);
862            pos += len;
863            self.stg.write(start, &buf);
864        }
865        self.stg.commit(size);
866        self.upd.commit(0);
867    }
868
869    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
870    pub fn commit_phase(&mut self, size: u64, phase: u8) {
871        if self.map.is_empty() && self.list.is_empty() {
872            return;
873        }
874        if phase == 1 {
875            self.list = self.map.convert_to_vec();
876
877            // Write the updates to upd.
878            // First set the end position to zero.
879            self.upd.write_u64(0, 0);
880            self.upd.write_u64(8, size);
881            self.upd.commit(16); // Not clear if this is necessary.
882
883            // Write the update records.
884            let mut stg_written = false;
885            let mut pos: u64 = 16;
886            for (start, v) in self.list.iter() {
887                let (start, len, data) = (*start, v.len as u64, v.all());
888                if start >= self.size {
889                    // Writes beyond current stg size can be written directly.
890                    stg_written = true;
891                    self.stg.write(start, data);
892                } else {
893                    self.upd.write_u64(pos, start);
894                    pos += 8;
895                    self.upd.write_u64(pos, len);
896                    pos += 8;
897                    self.upd.write(pos, data);
898                    pos += len;
899                }
900            }
901            if stg_written {
902                self.stg.commit(size);
903            }
904            self.upd.commit(pos); // Not clear if this is necessary.
905
906            // Set the end position.
907            self.upd.write_u64(0, pos);
908            self.upd.write_u64(8, size);
909            self.upd.commit(pos);
910        } else {
911            for (start, v) in self.list.iter() {
912                if *start < self.size {
913                    // Writes beyond current stg size have already been written.
914                    self.stg.write(*start, v.all());
915                }
916            }
917            self.list.clear();
918            self.stg.commit(size);
919            self.upd.commit(0);
920        }
921    }
922}
923
924impl Storage for BasicAtomicFile {
925    fn commit(&mut self, size: u64) {
926        self.commit_phase(size, 1);
927        self.commit_phase(size, 2);
928        self.size = size;
929    }
930
931    fn size(&self) -> u64 {
932        self.size
933    }
934
935    fn read(&self, start: u64, data: &mut [u8]) {
936        self.map.read(start, data, &*self.stg.stg);
937    }
938
939    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
940        self.map.write(start, data, off, len);
941    }
942
943    fn write(&mut self, start: u64, data: &[u8]) {
944        let len = data.len();
945        let d = Arc::new(data.to_vec());
946        self.write_data(start, d, 0, len);
947    }
948}
949
950#[cfg(test)]
951/// Get amount of testing from environment variable TA.
952fn test_amount() -> usize {
953    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
954}
955
956#[test]
957fn test_atomic_file() {
958    use rand::Rng;
959    /* Idea of test is to check AtomicFile and MemFile behave the same */
960
961    let ta = test_amount();
962    println!(" Test amount={}", ta);
963
964    let mut rng = rand::thread_rng();
965
966    for _ in 0..100 {
967        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
968        let mut s2 = MemFile::new();
969
970        for _ in 0..1000 * ta {
971            let off: usize = rng.r#gen::<usize>() % 100;
972            let mut len = 1 + rng.r#gen::<usize>() % 20;
973            let w: bool = rng.r#gen();
974            if w {
975                let mut bytes = Vec::new();
976                while len > 0 {
977                    len -= 1;
978                    let b: u8 = rng.r#gen::<u8>();
979                    bytes.push(b);
980                }
981                s1.write(off as u64, &bytes);
982                s2.write(off as u64, &bytes);
983            } else {
984                let mut b2 = vec![0; len];
985                let mut b3 = vec![0; len];
986                s1.read(off as u64, &mut b2);
987                s2.read(off as u64, &mut b3);
988                assert!(b2 == b3);
989            }
990        }
991        s1.commit(200);
992        s2.commit(200);
993    }
994}