Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5use rustc_hash::FxHashMap as HashMap;
6use std::cmp::min;
7use std::sync::{Arc, Mutex, RwLock};
8
9#[cfg(feature = "pstd")]
10use pstd::collections::BTreeMap;
11#[cfg(not(feature = "pstd"))]
12use std::collections::BTreeMap;
13
14/// ```Arc<Vec<u8>>```
15pub type Data = Arc<Vec<u8>>;
16
17/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
18/// Performs commit asyncronously.
19///
20/// #Example
21///
22/// ```
23/// use atom_file::{AtomicFile,DummyFile,MemFile,Storage};
24/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
25/// af.write( 0, &[1,2,3,4] );
26/// af.commit(4);
27/// af.wait_complete();
28/// ```
29///
30/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
31/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
32/// the updates to underlying storage have been applied.
33pub struct AtomicFile {
34    /// New updates are written here.
35    map: WMap,
36    /// Underlying file, with previous updates mapped.
37    cf: Arc<RwLock<CommitFile>>,
38    /// File size.
39    size: u64,
40    /// For sending update maps to be saved.
41    tx: std::sync::mpsc::Sender<(u64, WMap)>,
42    /// Held by update process while it is active.
43    busy: Arc<Mutex<()>>,
44    /// Limit on size of CommitFile map.
45    map_lim: usize,
46}
47
48impl AtomicFile {
49    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
50    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
51        Self::new_with_limits(stg, upd, &Limits::default())
52    }
53
54    /// Construct Atomic file with specified limits.
55    pub fn new_with_limits(
56        stg: Box<dyn Storage>,
57        upd: Box<dyn Storage>,
58        lim: &Limits,
59    ) -> Box<Self> {
60        let size = stg.size();
61        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
62        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
63        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
64        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
65
66        // Start the thread which does save asyncronously.
67        let (cf1, busy1) = (cf.clone(), busy.clone());
68        std::thread::spawn(move || {
69            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
70            while let Ok((size, map)) = rx.recv() {
71                let _lock = busy1.lock();
72                baf.map = map;
73                baf.commit(size);
74                cf1.write().unwrap().done_one();
75            }
76        });
77        Box::new(Self {
78            map: WMap::default(),
79            cf,
80            size,
81            tx,
82            busy,
83            map_lim: lim.map_lim,
84        })
85    }
86}
87
88impl Storage for AtomicFile {
89    fn commit(&mut self, size: u64) {
90        self.size = size;
91        if self.map.is_empty() {
92            return;
93        }
94        if self.cf.read().unwrap().map.len() > self.map_lim {
95            self.wait_complete();
96        }
97        let map = std::mem::take(&mut self.map);
98        let cf = &mut *self.cf.write().unwrap();
99        cf.todo += 1;
100        // Apply map of updates to CommitFile.
101        map.to_storage(cf);
102        // Send map of updates to thread to be written to underlying storage.
103        self.tx.send((size, map)).unwrap();
104    }
105
106    fn size(&self) -> u64 {
107        self.size
108    }
109
110    fn read(&self, start: u64, data: &mut [u8]) {
111        self.map.read(start, data, &*self.cf.read().unwrap());
112    }
113
114    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
115        self.map.write(start, data, off, len);
116    }
117
118    fn write(&mut self, start: u64, data: &[u8]) {
119        let len = data.len();
120        let d = Arc::new(data.to_vec());
121        self.write_data(start, d, 0, len);
122    }
123
124    fn wait_complete(&self) {
125        while self.cf.read().unwrap().todo != 0 {
126            let _x = self.busy.lock();
127        }
128    }
129}
130
131struct CommitFile {
132    /// Buffered underlying storage.
133    stg: ReadBufStg<256>,
134    /// Map of committed updates.
135    map: WMap,
136    /// Number of outstanding unsaved commits.
137    todo: usize,
138}
139
140impl CommitFile {
141    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
142        Self {
143            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
144            map: WMap::default(),
145            todo: 0,
146        }
147    }
148
149    fn done_one(&mut self) {
150        self.todo -= 1;
151        if self.todo == 0 {
152            self.map = WMap::default();
153            self.stg.reset();
154        }
155    }
156}
157
158impl Storage for CommitFile {
159    fn commit(&mut self, _size: u64) {
160        panic!()
161    }
162
163    fn size(&self) -> u64 {
164        panic!()
165    }
166
167    fn read(&self, start: u64, data: &mut [u8]) {
168        self.map.read(start, data, &self.stg);
169    }
170
171    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
172        self.map.write(start, data, off, len);
173    }
174
175    fn write(&mut self, _start: u64, _data: &[u8]) {
176        panic!()
177    }
178}
179
180/// Storage interface - Storage is some kind of "file" storage.
181///
182/// read and write methods take a start which is a byte offset in the underlying file.
183pub trait Storage: Send + Sync {
184    /// Get the size of the underlying storage.
185    /// Note : this is valid initially and after a commit but is not defined after write is called.
186    fn size(&self) -> u64;
187
188    /// Read data.
189    fn read(&self, start: u64, data: &mut [u8]);
190
191    /// Write byte slice to storage.
192    fn write(&mut self, start: u64, data: &[u8]);
193
194    /// Write byte Vec.
195    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
196        let len = data.len();
197        let d = Arc::new(data);
198        self.write_data(start, d, 0, len);
199    }
200
201    /// Write Data slice.
202    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
203        self.write(start, &data[off..off + len]);
204    }
205
206    /// Finish write transaction, size is new size of underlying storage.
207    fn commit(&mut self, size: u64);
208
209    /// Write u64.
210    fn write_u64(&mut self, start: u64, value: u64) {
211        self.write(start, &value.to_le_bytes());
212    }
213
214    /// Read u64.
215    fn read_u64(&self, start: u64) -> u64 {
216        let mut bytes = [0; 8];
217        self.read(start, &mut bytes);
218        u64::from_le_bytes(bytes)
219    }
220
221    /// Clone. note: provided method panics.
222    fn clone(&self) -> Box<dyn Storage> {
223        panic!()
224    }
225
226    /// Wait until current writes are complete.
227    fn wait_complete(&self) {}
228}
229
230/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
231#[derive(Default)]
232pub struct MemFile {
233    v: Arc<Mutex<Vec<u8>>>,
234}
235
236impl MemFile {
237    /// Get a new (boxed) MemFile.
238    pub fn new() -> Box<Self> {
239        Box::default()
240    }
241}
242
243impl Storage for MemFile {
244    fn size(&self) -> u64 {
245        let v = self.v.lock().unwrap();
246        v.len() as u64
247    }
248
249    fn read(&self, off: u64, bytes: &mut [u8]) {
250        let off = off as usize;
251        let len = bytes.len();
252        let mut v = self.v.lock().unwrap();
253        if off + len > v.len() {
254            v.resize(off + len, 0);
255        }
256        bytes.copy_from_slice(&v[off..off + len]);
257    }
258
259    fn write(&mut self, off: u64, bytes: &[u8]) {
260        let off = off as usize;
261        let len = bytes.len();
262        let mut v = self.v.lock().unwrap();
263        if off + len > v.len() {
264            v.resize(off + len, 0);
265        }
266        v[off..off + len].copy_from_slice(bytes);
267    }
268
269    fn commit(&mut self, size: u64) {
270        let mut v = self.v.lock().unwrap();
271        v.resize(size as usize, 0);
272    }
273
274    fn clone(&self) -> Box<dyn Storage> {
275        Box::new(Self { v: self.v.clone() })
276    }
277}
278
279use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
280
281/// Simple implementation of [Storage] using [`std::fs::File`].
282pub struct SimpleFileStorage {
283    file: Arc<Mutex<fs::File>>,
284}
285
286impl SimpleFileStorage {
287    /// Construct from filename.
288    pub fn new(filename: &str) -> Box<Self> {
289        Box::new(Self {
290            file: Arc::new(Mutex::new(
291                OpenOptions::new()
292                    .read(true)
293                    .write(true)
294                    .create(true)
295                    .truncate(false)
296                    .open(filename)
297                    .unwrap(),
298            )),
299        })
300    }
301}
302
303impl Storage for SimpleFileStorage {
304    fn size(&self) -> u64 {
305        let mut f = self.file.lock().unwrap();
306        f.seek(SeekFrom::End(0)).unwrap()
307    }
308
309    fn read(&self, off: u64, bytes: &mut [u8]) {
310        let mut f = self.file.lock().unwrap();
311        f.seek(SeekFrom::Start(off)).unwrap();
312        let _ = f.read(bytes).unwrap();
313    }
314
315    fn write(&mut self, off: u64, bytes: &[u8]) {
316        let mut f = self.file.lock().unwrap();
317        // The list of operating systems which auto-zero is likely more than this...research is todo.
318        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
319        {
320            let size = f.seek(SeekFrom::End(0)).unwrap();
321            if off > size {
322                f.set_len(off).unwrap();
323            }
324        }
325        f.seek(SeekFrom::Start(off)).unwrap();
326        let _ = f.write(bytes).unwrap();
327    }
328
329    fn commit(&mut self, size: u64) {
330        let f = self.file.lock().unwrap();
331        f.set_len(size).unwrap();
332        f.sync_all().unwrap();
333    }
334
335    fn clone(&self) -> Box<dyn Storage> {
336        Box::new(Self {
337            file: self.file.clone(),
338        })
339    }
340}
341
342/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
343#[allow(clippy::vec_box)]
344pub struct MultiFileStorage {
345    filename: String,
346    files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
347}
348
349impl MultiFileStorage {
350    /// Create new MultiFileStorage.
351    pub fn new(filename: &str) -> Box<Self> {
352        Box::new(Self {
353            filename: filename.to_owned(),
354            files: Arc::new(Mutex::new(Vec::new())),
355        })
356    }
357
358    fn get_file(&self) -> Box<SimpleFileStorage> {
359        match self.files.lock().unwrap().pop() {
360            Some(f) => f,
361            _ => SimpleFileStorage::new(&self.filename),
362        }
363    }
364
365    fn put_file(&self, f: Box<SimpleFileStorage>) {
366        self.files.lock().unwrap().push(f);
367    }
368}
369
370impl Storage for MultiFileStorage {
371    fn size(&self) -> u64 {
372        let f = self.get_file();
373        let result = f.size();
374        self.put_file(f);
375        result
376    }
377
378    fn read(&self, off: u64, bytes: &mut [u8]) {
379        let f = self.get_file();
380        f.read(off, bytes);
381        self.put_file(f);
382    }
383
384    fn write(&mut self, off: u64, bytes: &[u8]) {
385        let mut f = self.get_file();
386        f.write(off, bytes);
387        self.put_file(f);
388    }
389
390    fn commit(&mut self, size: u64) {
391        let mut f = self.get_file();
392        f.commit(size);
393        self.put_file(f);
394    }
395
396    fn clone(&self) -> Box<dyn Storage> {
397        Box::new(Self {
398            filename: self.filename.clone(),
399            files: self.files.clone(),
400        })
401    }
402}
403
404/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
405pub struct DummyFile {}
406impl DummyFile {
407    /// Construct.
408    pub fn new() -> Box<Self> {
409        Box::new(Self {})
410    }
411}
412
413impl Storage for DummyFile {
414    fn size(&self) -> u64 {
415        0
416    }
417
418    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
419
420    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
421
422    fn commit(&mut self, _size: u64) {}
423
424    fn clone(&self) -> Box<dyn Storage> {
425        Self::new()
426    }
427}
428
429/// Memory configuration limits.
430#[non_exhaustive]
431pub struct Limits {
432    /// Limit on size of commit write map, default is 5000.
433    pub map_lim: usize,
434    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
435    pub rbuf_mem: usize,
436    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
437    pub swbuf: usize,
438    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
439    pub uwbuf: usize,
440}
441
442impl Default for Limits {
443    fn default() -> Self {
444        Self {
445            map_lim: 5000,
446            rbuf_mem: 0x200000,
447            swbuf: 0x100000,
448            uwbuf: 0x100000,
449        }
450    }
451}
452
453/// Write Buffer.
454struct WriteBuffer {
455    /// Current write index into buf.
456    ix: usize,
457    /// Current file position.
458    pos: u64,
459    /// Underlying storage.
460    pub stg: Box<dyn Storage>,
461    /// Buffer.
462    buf: Vec<u8>,
463}
464
465impl WriteBuffer {
466    /// Construct.
467    pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
468        Self {
469            ix: 0,
470            pos: u64::MAX,
471            stg,
472            buf: vec![0; buf_size],
473        }
474    }
475
476    /// Write data to specified offset,
477    pub fn write(&mut self, off: u64, data: &[u8]) {
478        if self.pos + self.ix as u64 != off {
479            self.flush(off);
480        }
481        let mut done: usize = 0;
482        let mut todo: usize = data.len();
483        while todo > 0 {
484            let mut n: usize = self.buf.len() - self.ix;
485            if n == 0 {
486                self.flush(off + done as u64);
487                n = self.buf.len();
488            }
489            if n > todo {
490                n = todo;
491            }
492            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
493            todo -= n;
494            done += n;
495            self.ix += n;
496        }
497    }
498
499    fn flush(&mut self, new_pos: u64) {
500        if self.ix > 0 {
501            self.stg.write(self.pos, &self.buf[0..self.ix]);
502        }
503        self.ix = 0;
504        self.pos = new_pos;
505    }
506
507    /// Commit.
508    pub fn commit(&mut self, size: u64) {
509        self.flush(u64::MAX);
510        self.stg.commit(size);
511    }
512
513    /// Write u64.
514    pub fn write_u64(&mut self, start: u64, value: u64) {
515        self.write(start, &value.to_le_bytes());
516    }
517}
518
519/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
520///
521/// See implementation of AtomicFile for how this is used in conjunction with WMap.
522///
523/// N is buffer size.
524struct ReadBufStg<const N: usize> {
525    /// Underlying storage.
526    stg: Box<dyn Storage>,
527    /// Buffers.
528    buf: Mutex<ReadBuffer<N>>,
529    /// Read size that is considered small.
530    limit: usize,
531}
532
533impl<const N: usize> Drop for ReadBufStg<N> {
534    fn drop(&mut self) {
535        self.reset();
536    }
537}
538
539impl<const N: usize> ReadBufStg<N> {
540    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
541    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
542        Self {
543            stg,
544            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
545            limit,
546        }
547    }
548
549    /// Clears the buffers.
550    fn reset(&mut self) {
551        self.buf.lock().unwrap().reset();
552    }
553}
554
555impl<const N: usize> Storage for ReadBufStg<N> {
556    /// Read data from storage.
557    fn read(&self, start: u64, data: &mut [u8]) {
558        if data.len() <= self.limit {
559            self.buf.lock().unwrap().read(&*self.stg, start, data);
560        } else {
561            self.stg.read(start, data);
562        }
563    }
564
565    /// Panics.
566    fn size(&self) -> u64 {
567        panic!()
568    }
569
570    /// Panics.
571    fn write(&mut self, _start: u64, _data: &[u8]) {
572        panic!();
573    }
574
575    /// Panics.
576    fn commit(&mut self, _size: u64) {
577        panic!();
578    }
579}
580
581struct ReadBuffer<const N: usize> {
582    /// Maps sector mumbers cached buffers.
583    map: HashMap<u64, Box<[u8; N]>>,
584    /// Maximum number of buffers.
585    max_buf: usize,
586}
587
588impl<const N: usize> ReadBuffer<N> {
589    fn new(max_buf: usize) -> Self {
590        Self {
591            map: HashMap::default(),
592            max_buf,
593        }
594    }
595
596    fn reset(&mut self) {
597        self.map.clear();
598    }
599
600    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
601        let mut done = 0;
602        while done < data.len() {
603            let off = off + done as u64;
604            let sector = off / N as u64;
605            let disp = (off % N as u64) as usize;
606            let amount = min(data.len() - done, N - disp);
607
608            let p = self.map.entry(sector).or_insert_with(|| {
609                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
610                stg.read(sector * N as u64, &mut *p);
611                p
612            });
613            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
614            done += amount;
615        }
616        if self.map.len() >= self.max_buf {
617            self.reset();
618        }
619    }
620}
621
622#[derive(Default)]
623/// Slice of Data to be written to storage.
624struct DataSlice {
625    /// Slice data.
626    pub data: Data,
627    /// Start of slice.
628    pub off: usize,
629    /// Length of slice.
630    pub len: usize,
631}
632
633impl DataSlice {
634    /// Get reference to the whole slice.
635    pub fn all(&self) -> &[u8] {
636        &self.data[self.off..self.off + self.len]
637    }
638    /// Get reference to part of slice.
639    pub fn part(&self, off: usize, len: usize) -> &[u8] {
640        &self.data[self.off + off..self.off + off + len]
641    }
642    /// Trim specified amount from start of slice.
643    pub fn trim(&mut self, trim: usize) {
644        self.off += trim;
645        self.len -= trim;
646    }
647    /// Take the data.
648    #[allow(dead_code)]
649    pub fn take(&mut self) -> Data {
650        std::mem::take(&mut self.data)
651    }
652}
653
654#[derive(Default)]
655/// Updateable store based on some underlying storage.
656struct WMap {
657    /// Map of writes. Key is the end of the slice.
658    map: BTreeMap<u64, DataSlice>,
659}
660
661impl WMap {
662    /// Is the map empty?
663    pub fn is_empty(&self) -> bool {
664        self.map.is_empty()
665    }
666
667    /// Number of key-value pairs in the map.
668    pub fn len(&self) -> usize {
669        self.map.len()
670    }
671
672    /// Take the map and convert it to a Vec.
673    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
674        let map = std::mem::take(&mut self.map);
675        let mut result = Vec::with_capacity(map.len());
676        for (end, v) in map {
677            let start = end - v.len as u64;
678            result.push((start, v));
679        }
680        result
681    }
682
683    /// Write the map into storage.
684    pub fn to_storage(&self, stg: &mut dyn Storage) {
685        for (end, v) in self.map.iter() {
686            let start = end - v.len as u64;
687            stg.write_data(start, v.data.clone(), v.off, v.len);
688        }
689    }
690
691    #[cfg(not(feature = "pstd"))]
692    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
693    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
694        if len != 0 {
695            let (mut insert, mut remove) = (Vec::new(), Vec::new());
696            let end = start + len as u64;
697            for (ee, v) in self.map.range_mut(start + 1..) {
698                let ee = *ee;
699                let es = ee - v.len as u64; // Existing write Start.
700                if es >= end {
701                    // Existing write starts after end of new write, nothing to do.
702                    break;
703                } else if start <= es {
704                    if end < ee {
705                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
706                        v.trim((end - es) as usize);
707                        break;
708                    }
709                    // New write subsumes existing write entirely, remove existing write.
710                    remove.push(ee);
711                } else if end < ee {
712                    // New write starts in middle of existing write, ends before end of existing write,
713                    // put start of existing write in insert list, trim existing write.
714                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
715                    v.trim((end - es) as usize);
716                    break;
717                } else {
718                    // New write starts in middle of existing write, ends after existing write,
719                    // put start of existing write in insert list, remove existing write.
720                    insert.push((es, v.take(), v.off, (start - es) as usize));
721                    remove.push(ee);
722                }
723            }
724            for end in remove {
725                self.map.remove(&end);
726            }
727            for (start, data, off, len) in insert {
728                self.map
729                    .insert(start + len as u64, DataSlice { data, off, len });
730            }
731            self.map
732                .insert(start + len as u64, DataSlice { data, off, len });
733        }
734    }
735
736    #[cfg(feature = "pstd")]
737    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
738    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
739        if len != 0 {
740            let end = start + len as u64;
741            let mut c = self
742                .map
743                .lower_bound_mut(std::ops::Bound::Excluded(&start))
744                .with_mutable_key();
745            while let Some((eend, v)) = c.next() {
746                let ee = *eend;
747                let es = ee - v.len as u64; // Existing write Start.
748                if es >= end {
749                    // Existing write starts after end of new write, nothing to do.
750                    c.prev();
751                    break;
752                } else if start <= es {
753                    if end < ee {
754                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
755                        v.trim((end - es) as usize);
756                        c.prev();
757                        break;
758                    }
759                    // New write subsumes existing write entirely, remove existing write.
760                    c.remove_prev();
761                } else if end < ee {
762                    // New write starts in middle of existing write, ends before end of existing write,
763                    // trim existing write, insert start of existing write.
764                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
765                    v.trim((end - es) as usize);
766                    c.prev();
767                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
768                    break;
769                } else {
770                    // New write starts in middle of existing write, ends after existing write,
771                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
772                    v.len = (start - es) as usize;
773                    *eend = es + v.len as u64;
774                }
775            }
776            // Insert the new write.
777            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
778        }
779    }
780
781    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
782    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
783        let len = data.len();
784        if len != 0 {
785            let mut done = 0;
786            for (&end, v) in self.map.range(start + 1..) {
787                let es = end - v.len as u64; // Existing write Start.
788                let doff = start + done as u64;
789                if es > doff {
790                    // Read from underlying storage.
791                    let a = min(len - done, (es - doff) as usize);
792                    u.read(doff, &mut data[done..done + a]);
793                    done += a;
794                    if done == len {
795                        return;
796                    }
797                }
798                // Use existing write.
799                let skip = (start + done as u64 - es) as usize;
800                let a = min(len - done, v.len - skip);
801                data[done..done + a].copy_from_slice(v.part(skip, a));
802                done += a;
803                if done == len {
804                    return;
805                }
806            }
807            u.read(start + done as u64, &mut data[done..]);
808        }
809    }
810}
811
812/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
813pub struct BasicAtomicFile {
814    /// The main underlying storage.
815    stg: WriteBuffer,
816    /// Temporary storage for updates during commit.
817    upd: WriteBuffer,
818    /// Map of writes.
819    map: WMap,
820    /// List of writes.
821    list: Vec<(u64, DataSlice)>,
822    size: u64,
823}
824
825impl BasicAtomicFile {
826    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
827    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
828        let size = stg.size();
829        let mut result = Box::new(Self {
830            stg: WriteBuffer::new(stg, lim.swbuf),
831            upd: WriteBuffer::new(upd, lim.uwbuf),
832            map: WMap::default(),
833            list: Vec::new(),
834            size,
835        });
836        result.init();
837        result
838    }
839
840    /// Apply outstanding updates.
841    fn init(&mut self) {
842        let end = self.upd.stg.read_u64(0);
843        let size = self.upd.stg.read_u64(8);
844        if end == 0 {
845            return;
846        }
847        assert!(end == self.upd.stg.size());
848        let mut pos = 16;
849        while pos < end {
850            let start = self.upd.stg.read_u64(pos);
851            pos += 8;
852            let len = self.upd.stg.read_u64(pos);
853            pos += 8;
854            let mut buf = vec![0; len as usize];
855            self.upd.stg.read(pos, &mut buf);
856            pos += len;
857            self.stg.write(start, &buf);
858        }
859        self.stg.commit(size);
860        self.upd.commit(0);
861    }
862
863    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
864    pub fn commit_phase(&mut self, size: u64, phase: u8) {
865        if self.map.is_empty() && self.list.is_empty() {
866            return;
867        }
868        if phase == 1 {
869            self.list = self.map.convert_to_vec();
870
871            // Write the updates to upd.
872            // First set the end position to zero.
873            self.upd.write_u64(0, 0);
874            self.upd.write_u64(8, size);
875            self.upd.commit(16); // Not clear if this is necessary.
876
877            // Write the update records.
878            let mut stg_written = false;
879            let mut pos: u64 = 16;
880            for (start, v) in self.list.iter() {
881                let (start, len, data) = (*start, v.len as u64, v.all());
882                if start >= self.size {
883                    // Writes beyond current stg size can be written directly.
884                    stg_written = true;
885                    self.stg.write(start, data);
886                } else {
887                    self.upd.write_u64(pos, start);
888                    pos += 8;
889                    self.upd.write_u64(pos, len);
890                    pos += 8;
891                    self.upd.write(pos, data);
892                    pos += len;
893                }
894            }
895            if stg_written {
896                self.stg.commit(size);
897            }
898            self.upd.commit(pos); // Not clear if this is necessary.
899
900            // Set the end position.
901            self.upd.write_u64(0, pos);
902            self.upd.write_u64(8, size);
903            self.upd.commit(pos);
904        } else {
905            for (start, v) in self.list.iter() {
906                if *start < self.size {
907                    // Writes beyond current stg size have already been written.
908                    self.stg.write(*start, v.all());
909                }
910            }
911            self.list.clear();
912            self.stg.commit(size);
913            self.upd.commit(0);
914        }
915    }
916}
917
918impl Storage for BasicAtomicFile {
919    fn commit(&mut self, size: u64) {
920        self.commit_phase(size, 1);
921        self.commit_phase(size, 2);
922        self.size = size;
923    }
924
925    fn size(&self) -> u64 {
926        self.size
927    }
928
929    fn read(&self, start: u64, data: &mut [u8]) {
930        self.map.read(start, data, &*self.stg.stg);
931    }
932
933    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
934        self.map.write(start, data, off, len);
935    }
936
937    fn write(&mut self, start: u64, data: &[u8]) {
938        let len = data.len();
939        let d = Arc::new(data.to_vec());
940        self.write_data(start, d, 0, len);
941    }
942}
943
944#[cfg(test)]
945/// Get amount of testing from environment variable TA.
946fn test_amount() -> usize {
947    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
948}
949
950#[test]
951fn test_atomic_file() {
952    use rand::Rng;
953    /* Idea of test is to check AtomicFile and MemFile behave the same */
954
955    let ta = test_amount();
956    println!(" Test amount={}", ta);
957
958    let mut rng = rand::thread_rng();
959
960    for _ in 0..100 {
961        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new() );
962        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
963        let mut s2 = MemFile::new();
964
965        for _ in 0..1000 * ta {
966            let off: usize = rng.r#gen::<usize>() % 100;
967            let mut len = 1 + rng.r#gen::<usize>() % 20;
968            let w: bool = rng.r#gen();
969            if w {
970                let mut bytes = Vec::new();
971                while len > 0 {
972                    len -= 1;
973                    let b: u8 = rng.r#gen::<u8>();
974                    bytes.push(b);
975                }
976                s1.write(off as u64, &bytes);
977                s2.write(off as u64, &bytes);
978            } else {
979                let mut b2 = vec![0; len];
980                let mut b3 = vec![0; len];
981                s1.read(off as u64, &mut b2);
982                s2.read(off as u64, &mut b3);
983                assert!(b2 == b3);
984            }
985            if rng.r#gen::<usize>() % 50 == 0 {
986                s1.commit(200);
987                s2.commit(200);
988            }
989        }
990    }
991}