Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10
11#[cfg(feature = "pstd")]
12use pstd::collections::BTreeMap;
13#[cfg(not(feature = "pstd"))]
14use std::collections::BTreeMap;
15
16/// ```Arc<Vec<u8>>```
17pub type Data = Arc<Vec<u8>>;
18
19/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
20/// Performs commit asyncronously.
21///
22/// #Example
23///
24/// ```
25/// use atom_file::{AtomicFile,DummyFile,MemFile,Storage};
26/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
27/// af.write( 0, &[1,2,3,4] );
28/// af.commit(4);
29/// af.wait_complete();
30/// ```
31///
32/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
33/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
34/// the updates to underlying storage have been applied.
35pub struct AtomicFile {
36    /// New updates are written here.
37    map: WMap,
38    /// Underlying file, with previous updates mapped.
39    cf: Arc<RwLock<CommitFile>>,
40    /// File size.
41    size: u64,
42    /// For sending update maps to be saved.
43    tx: std::sync::mpsc::Sender<(u64, WMap)>,
44    /// Held by update process while it is active.
45    busy: Arc<Mutex<()>>,
46    /// Limit on size of CommitFile map.
47    map_lim: usize,
48}
49
50impl AtomicFile {
51    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
52    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
53        Self::new_with_limits(stg, upd, &Limits::default())
54    }
55
56    /// Construct Atomic file with specified limits.
57    pub fn new_with_limits(
58        stg: Box<dyn Storage>,
59        upd: Box<dyn Storage>,
60        lim: &Limits,
61    ) -> Box<Self> {
62        let size = stg.size();
63        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
64        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
65        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
66        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
67
68        // Start the thread which does save asyncronously.
69        let (cf1, busy1) = (cf.clone(), busy.clone());
70        std::thread::spawn(move || {
71            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
72            while let Ok((size, map)) = rx.recv() {
73                let _lock = busy1.lock();
74                baf.map = map;
75                baf.commit(size);
76                cf1.write().unwrap().done_one();
77            }
78        });
79        Box::new(Self {
80            map: WMap::default(),
81            cf,
82            size,
83            tx,
84            busy,
85            map_lim: lim.map_lim,
86        })
87    }
88}
89
90impl Storage for AtomicFile {
91    fn commit(&mut self, size: u64) {
92        self.size = size;
93        if self.map.is_empty() {
94            return;
95        }
96        if self.cf.read().unwrap().map.len() > self.map_lim {
97            self.wait_complete();
98        }
99        let map = std::mem::take(&mut self.map);
100        let cf = &mut *self.cf.write().unwrap();
101        cf.todo += 1;
102        // Apply map of updates to CommitFile.
103        map.to_storage(cf);
104        // Send map of updates to thread to be written to underlying storage.
105        self.tx.send((size, map)).unwrap();
106    }
107
108    fn size(&self) -> u64 {
109        self.size
110    }
111
112    fn read(&self, start: u64, data: &mut [u8]) {
113        self.map.read(start, data, &*self.cf.read().unwrap());
114    }
115
116    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
117        self.map.write(start, data, off, len);
118    }
119
120    fn write(&mut self, start: u64, data: &[u8]) {
121        let len = data.len();
122        let d = Arc::new(data.to_vec());
123        self.write_data(start, d, 0, len);
124    }
125
126    fn wait_complete(&self) {
127        while self.cf.read().unwrap().todo != 0 {
128            let _x = self.busy.lock();
129        }
130    }
131}
132
133struct CommitFile {
134    /// Buffered underlying storage.
135    stg: ReadBufStg<256>,
136    /// Map of committed updates.
137    map: WMap,
138    /// Number of outstanding unsaved commits.
139    todo: usize,
140}
141
142impl CommitFile {
143    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
144        Self {
145            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
146            map: WMap::default(),
147            todo: 0,
148        }
149    }
150
151    fn done_one(&mut self) {
152        self.todo -= 1;
153        if self.todo == 0 {
154            self.map = WMap::default();
155            self.stg.reset();
156        }
157    }
158}
159
160impl Storage for CommitFile {
161    fn commit(&mut self, _size: u64) {
162        panic!()
163    }
164
165    fn size(&self) -> u64 {
166        panic!()
167    }
168
169    fn read(&self, start: u64, data: &mut [u8]) {
170        self.map.read(start, data, &self.stg);
171    }
172
173    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
174        self.map.write(start, data, off, len);
175    }
176
177    fn write(&mut self, _start: u64, _data: &[u8]) {
178        panic!()
179    }
180}
181
182/// Storage interface - Storage is some kind of "file" storage.
183///
184/// read and write methods take a start which is a byte offset in the underlying file.
185pub trait Storage: Send + Sync {
186    /// Get the size of the underlying storage.
187    /// Note : this is valid initially and after a commit but is not defined after write is called.
188    fn size(&self) -> u64;
189
190    /// Read data.
191    fn read(&self, start: u64, data: &mut [u8]);
192
193    /// Write byte slice to storage.
194    fn write(&mut self, start: u64, data: &[u8]);
195
196    /// Write byte Vec.
197    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
198        let len = data.len();
199        let d = Arc::new(data);
200        self.write_data(start, d, 0, len);
201    }
202
203    /// Write Data slice.
204    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
205        self.write(start, &data[off..off + len]);
206    }
207
208    /// Finish write transaction, size is new size of underlying storage.
209    fn commit(&mut self, size: u64);
210
211    /// Write u64.
212    fn write_u64(&mut self, start: u64, value: u64) {
213        self.write(start, &value.to_le_bytes());
214    }
215
216    /// Read u64.
217    fn read_u64(&self, start: u64) -> u64 {
218        let mut bytes = [0; 8];
219        self.read(start, &mut bytes);
220        u64::from_le_bytes(bytes)
221    }
222
223    /// Clone. note: provided method panics.
224    fn clone(&self) -> Box<dyn Storage> {
225        panic!()
226    }
227
228    /// Wait until current writes are complete.
229    fn wait_complete(&self) {}
230}
231
232/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
233#[derive(Default)]
234pub struct MemFile {
235    v: Arc<Mutex<Vec<u8>>>,
236}
237
238impl MemFile {
239    /// Get a new (boxed) MemFile.
240    pub fn new() -> Box<Self> {
241        Box::default()
242    }
243}
244
245impl Storage for MemFile {
246    fn size(&self) -> u64 {
247        let v = self.v.lock().unwrap();
248        v.len() as u64
249    }
250
251    fn read(&self, off: u64, bytes: &mut [u8]) {
252        let off = off as usize;
253        let len = bytes.len();
254        let mut v = self.v.lock().unwrap();
255        if off + len > v.len() {
256            v.resize(off + len, 0);
257        }
258        bytes.copy_from_slice(&v[off..off + len]);
259    }
260
261    fn write(&mut self, off: u64, bytes: &[u8]) {
262        let off = off as usize;
263        let len = bytes.len();
264        let mut v = self.v.lock().unwrap();
265        if off + len > v.len() {
266            v.resize(off + len, 0);
267        }
268        v[off..off + len].copy_from_slice(bytes);
269    }
270
271    fn commit(&mut self, size: u64) {
272        let mut v = self.v.lock().unwrap();
273        v.resize(size as usize, 0);
274    }
275
276    fn clone(&self) -> Box<dyn Storage> {
277        Box::new(Self { v: self.v.clone() })
278    }
279}
280
281use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
282
283struct SimpleFileStorageInner {
284    f: fs::File,
285}
286
287impl SimpleFileStorageInner {
288    /// Construct from filename.
289    pub fn new(filename: &str) -> Self {
290        Self {
291            f: OpenOptions::new()
292                .read(true)
293                .write(true)
294                .create(true)
295                .truncate(false)
296                .open(filename)
297                .unwrap(),
298        }
299    }
300
301    fn size(&mut self) -> u64 {
302        self.f.seek(SeekFrom::End(0)).unwrap()
303    }
304
305    fn read(&mut self, off: u64, bytes: &mut [u8]) {
306        self.f.seek(SeekFrom::Start(off)).unwrap();
307        let _ = self.f.read(bytes).unwrap();
308    }
309
310    fn write(&mut self, off: u64, bytes: &[u8]) {
311        // The list of operating systems which auto-zero is likely more than this...research is todo.
312        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
313        {
314            let size = self.f.seek(SeekFrom::End(0)).unwrap();
315            if off > size {
316                self.f.set_len(off).unwrap();
317            }
318        }
319        self.f.seek(SeekFrom::Start(off)).unwrap();
320        let _ = self.f.write(bytes).unwrap();
321    }
322
323    fn commit(&mut self, size: u64) {
324        self.f.set_len(size).unwrap();
325        self.f.sync_all().unwrap();
326    }
327}
328
329/// Simple implementation of [Storage] using [`std::fs::File`].
330pub struct SimpleFileStorage {
331    file: Arc<Mutex<SimpleFileStorageInner>>,
332}
333
334impl SimpleFileStorage {
335    /// Construct from filename.
336    pub fn new(filename: &str) -> Box<Self> {
337        Box::new(Self {
338            file: Arc::new(Mutex::new(SimpleFileStorageInner::new(filename))),
339        })
340    }
341}
342
343impl Storage for SimpleFileStorage {
344    fn size(&self) -> u64 {
345        self.file.lock().unwrap().size()
346    }
347
348    fn read(&self, off: u64, bytes: &mut [u8]) {
349        self.file.lock().unwrap().read(off, bytes);
350    }
351
352    fn write(&mut self, off: u64, bytes: &[u8]) {
353        self.file.lock().unwrap().write(off, bytes);
354    }
355
356    fn commit(&mut self, size: u64) {
357        self.file.lock().unwrap().commit(size);
358    }
359
360    fn clone(&self) -> Box<dyn Storage> {
361        Box::new(Self {
362            file: self.file.clone(),
363        })
364    }
365}
366
367/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
368#[allow(clippy::vec_box)]
369pub struct MultiFileStorage {
370    filename: String,
371    files: Arc<Mutex<Vec<Box<SimpleFileStorageInner>>>>,
372}
373
374impl MultiFileStorage {
375    /// Create new MultiFileStorage.
376    pub fn new(filename: &str) -> Box<Self> {
377        Box::new(Self {
378            filename: filename.to_owned(),
379            files: Arc::new(Mutex::new(Vec::new())),
380        })
381    }
382
383    fn get_file(&self) -> Box<SimpleFileStorageInner> {
384        match self.files.lock().unwrap().pop() {
385            Some(f) => f,
386            _ => Box::new(SimpleFileStorageInner::new(&self.filename)),
387        }
388    }
389
390    fn put_file(&self, f: Box<SimpleFileStorageInner>) {
391        self.files.lock().unwrap().push(f);
392    }
393}
394
395impl Storage for MultiFileStorage {
396    fn size(&self) -> u64 {
397        let mut f = self.get_file();
398        let result = f.size();
399        self.put_file(f);
400        result
401    }
402
403    fn read(&self, off: u64, bytes: &mut [u8]) {
404        let mut f = self.get_file();
405        f.read(off, bytes);
406        self.put_file(f);
407    }
408
409    fn write(&mut self, off: u64, bytes: &[u8]) {
410        let mut f = self.get_file();
411        f.write(off, bytes);
412        self.put_file(f);
413    }
414
415    fn commit(&mut self, size: u64) {
416        let mut f = self.get_file();
417        f.commit(size);
418        self.put_file(f);
419    }
420
421    fn clone(&self) -> Box<dyn Storage> {
422        Box::new(Self {
423            filename: self.filename.clone(),
424            files: self.files.clone(),
425        })
426    }
427}
428
429/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
430pub struct DummyFile {}
431impl DummyFile {
432    /// Construct.
433    pub fn new() -> Box<Self> {
434        Box::new(Self {})
435    }
436}
437
438impl Storage for DummyFile {
439    fn size(&self) -> u64 {
440        0
441    }
442
443    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
444
445    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
446
447    fn commit(&mut self, _size: u64) {}
448
449    fn clone(&self) -> Box<dyn Storage> {
450        Self::new()
451    }
452}
453
454/// Memory configuration limits.
455#[non_exhaustive]
456pub struct Limits {
457    /// Limit on size of commit write map, default is 5000.
458    pub map_lim: usize,
459    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
460    pub rbuf_mem: usize,
461    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
462    pub swbuf: usize,
463    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
464    pub uwbuf: usize,
465}
466
467impl Default for Limits {
468    fn default() -> Self {
469        Self {
470            map_lim: 5000,
471            rbuf_mem: 0x200000,
472            swbuf: 0x100000,
473            uwbuf: 0x100000,
474        }
475    }
476}
477
478/// Write Buffer.
479struct WriteBuffer {
480    /// Current write index into buf.
481    ix: usize,
482    /// Current file position.
483    pos: u64,
484    /// Underlying storage.
485    pub stg: Box<dyn Storage>,
486    /// Buffer.
487    buf: Vec<u8>,
488}
489
490impl WriteBuffer {
491    /// Construct.
492    pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
493        Self {
494            ix: 0,
495            pos: u64::MAX,
496            stg,
497            buf: vec![0; buf_size],
498        }
499    }
500
501    /// Write data to specified offset,
502    pub fn write(&mut self, off: u64, data: &[u8]) {
503        if self.pos + self.ix as u64 != off {
504            self.flush(off);
505        }
506        let mut done: usize = 0;
507        let mut todo: usize = data.len();
508        while todo > 0 {
509            let mut n: usize = self.buf.len() - self.ix;
510            if n == 0 {
511                self.flush(off + done as u64);
512                n = self.buf.len();
513            }
514            if n > todo {
515                n = todo;
516            }
517            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
518            todo -= n;
519            done += n;
520            self.ix += n;
521        }
522    }
523
524    fn flush(&mut self, new_pos: u64) {
525        if self.ix > 0 {
526            self.stg.write(self.pos, &self.buf[0..self.ix]);
527        }
528        self.ix = 0;
529        self.pos = new_pos;
530    }
531
532    /// Commit.
533    pub fn commit(&mut self, size: u64) {
534        self.flush(u64::MAX);
535        self.stg.commit(size);
536    }
537
538    /// Write u64.
539    pub fn write_u64(&mut self, start: u64, value: u64) {
540        self.write(start, &value.to_le_bytes());
541    }
542}
543
544/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
545///
546/// See implementation of AtomicFile for how this is used in conjunction with WMap.
547///
548/// N is buffer size.
549struct ReadBufStg<const N: usize> {
550    /// Underlying storage.
551    stg: Box<dyn Storage>,
552    /// Buffers.
553    buf: Mutex<ReadBuffer<N>>,
554    /// Read size that is considered small.
555    limit: usize,
556}
557
558impl<const N: usize> Drop for ReadBufStg<N> {
559    fn drop(&mut self) {
560        self.reset();
561    }
562}
563
564impl<const N: usize> ReadBufStg<N> {
565    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
566    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
567        Self {
568            stg,
569            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
570            limit,
571        }
572    }
573
574    /// Clears the buffers.
575    fn reset(&mut self) {
576        self.buf.lock().unwrap().reset();
577    }
578}
579
580impl<const N: usize> Storage for ReadBufStg<N> {
581    /// Read data from storage.
582    fn read(&self, start: u64, data: &mut [u8]) {
583        if data.len() <= self.limit {
584            self.buf.lock().unwrap().read(&*self.stg, start, data);
585        } else {
586            self.stg.read(start, data);
587        }
588    }
589
590    /// Panics.
591    fn size(&self) -> u64 {
592        panic!()
593    }
594
595    /// Panics.
596    fn write(&mut self, _start: u64, _data: &[u8]) {
597        panic!();
598    }
599
600    /// Panics.
601    fn commit(&mut self, _size: u64) {
602        panic!();
603    }
604}
605
606struct ReadBuffer<const N: usize> {
607    /// Maps sector mumbers cached buffers.
608    map: HashMap<u64, Box<[u8; N]>>,
609    /// Maximum number of buffers.
610    max_buf: usize,
611}
612
613impl<const N: usize> ReadBuffer<N> {
614    fn new(max_buf: usize) -> Self {
615        Self {
616            map: HashMap::default(),
617            max_buf,
618        }
619    }
620
621    fn reset(&mut self) {
622        self.map.clear();
623    }
624
625    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
626        let mut done = 0;
627        while done < data.len() {
628            let off = off + done as u64;
629            let sector = off / N as u64;
630            let disp = (off % N as u64) as usize;
631            let amount = min(data.len() - done, N - disp);
632
633            let p = self.map.entry(sector).or_insert_with(|| {
634                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
635                stg.read(sector * N as u64, &mut *p);
636                p
637            });
638            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
639            done += amount;
640        }
641        if self.map.len() >= self.max_buf {
642            self.reset();
643        }
644    }
645}
646
647#[derive(Default)]
648/// Slice of Data to be written to storage.
649struct DataSlice {
650    /// Slice data.
651    pub data: Data,
652    /// Start of slice.
653    pub off: usize,
654    /// Length of slice.
655    pub len: usize,
656}
657
658impl DataSlice {
659    /// Get reference to the whole slice.
660    pub fn all(&self) -> &[u8] {
661        &self.data[self.off..self.off + self.len]
662    }
663    /// Get reference to part of slice.
664    pub fn part(&self, off: usize, len: usize) -> &[u8] {
665        &self.data[self.off + off..self.off + off + len]
666    }
667    /// Trim specified amount from start of slice.
668    pub fn trim(&mut self, trim: usize) {
669        self.off += trim;
670        self.len -= trim;
671    }
672    /// Take the data.
673    #[allow(dead_code)]
674    pub fn take(&mut self) -> Data {
675        std::mem::take(&mut self.data)
676    }
677}
678
679#[derive(Default)]
680/// Updateable store based on some underlying storage.
681struct WMap {
682    /// Map of writes. Key is the end of the slice.
683    map: BTreeMap<u64, DataSlice>,
684}
685
686impl WMap {
687    /// Is the map empty?
688    pub fn is_empty(&self) -> bool {
689        self.map.is_empty()
690    }
691
692    /// Number of key-value pairs in the map.
693    pub fn len(&self) -> usize {
694        self.map.len()
695    }
696
697    /// Take the map and convert it to a Vec.
698    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
699        let map = std::mem::take(&mut self.map);
700        let mut result = Vec::with_capacity(map.len());
701        for (end, v) in map {
702            let start = end - v.len as u64;
703            result.push((start, v));
704        }
705        result
706    }
707
708    /// Write the map into storage.
709    pub fn to_storage(&self, stg: &mut dyn Storage) {
710        for (end, v) in self.map.iter() {
711            let start = end - v.len as u64;
712            stg.write_data(start, v.data.clone(), v.off, v.len);
713        }
714    }
715
716    #[cfg(not(feature = "pstd"))]
717    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
718    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
719        if len != 0 {
720            let (mut insert, mut remove) = (Vec::new(), Vec::new());
721            let end = start + len as u64;
722            for (ee, v) in self.map.range_mut(start + 1..) {
723                let ee = *ee;
724                let es = ee - v.len as u64; // Existing write Start.
725                if es >= end {
726                    // Existing write starts after end of new write, nothing to do.
727                    break;
728                } else if start <= es {
729                    if end < ee {
730                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
731                        v.trim((end - es) as usize);
732                        break;
733                    }
734                    // New write subsumes existing write entirely, remove existing write.
735                    remove.push(ee);
736                } else if end < ee {
737                    // New write starts in middle of existing write, ends before end of existing write,
738                    // put start of existing write in insert list, trim existing write.
739                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
740                    v.trim((end - es) as usize);
741                    break;
742                } else {
743                    // New write starts in middle of existing write, ends after existing write,
744                    // put start of existing write in insert list, remove existing write.
745                    insert.push((es, v.take(), v.off, (start - es) as usize));
746                    remove.push(ee);
747                }
748            }
749            for end in remove {
750                self.map.remove(&end);
751            }
752            for (start, data, off, len) in insert {
753                self.map
754                    .insert(start + len as u64, DataSlice { data, off, len });
755            }
756            self.map
757                .insert(start + len as u64, DataSlice { data, off, len });
758        }
759    }
760
761    #[cfg(feature = "pstd")]
762    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
763    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
764        if len != 0 {
765            let end = start + len as u64;
766            let mut c = self
767                .map
768                .lower_bound_mut(std::ops::Bound::Excluded(&start))
769                .with_mutable_key();
770            while let Some((eend, v)) = c.next() {
771                let ee = *eend;
772                let es = ee - v.len as u64; // Existing write Start.
773                if es >= end {
774                    // Existing write starts after end of new write, nothing to do.
775                    c.prev();
776                    break;
777                } else if start <= es {
778                    if end < ee {
779                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
780                        v.trim((end - es) as usize);
781                        c.prev();
782                        break;
783                    }
784                    // New write subsumes existing write entirely, remove existing write.
785                    c.remove_prev();
786                } else if end < ee {
787                    // New write starts in middle of existing write, ends before end of existing write,
788                    // trim existing write, insert start of existing write.
789                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
790                    v.trim((end - es) as usize);
791                    c.prev();
792                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
793                    break;
794                } else {
795                    // New write starts in middle of existing write, ends after existing write,
796                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
797                    v.len = (start - es) as usize;
798                    *eend = es + v.len as u64;
799                }
800            }
801            // Insert the new write.
802            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
803        }
804    }
805
806    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
807    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
808        let len = data.len();
809        if len != 0 {
810            let mut done = 0;
811            for (&end, v) in self.map.range(start + 1..) {
812                let es = end - v.len as u64; // Existing write Start.
813                let doff = start + done as u64;
814                if es > doff {
815                    // Read from underlying storage.
816                    let a = min(len - done, (es - doff) as usize);
817                    u.read(doff, &mut data[done..done + a]);
818                    done += a;
819                    if done == len {
820                        return;
821                    }
822                }
823                // Use existing write.
824                let skip = (start + done as u64 - es) as usize;
825                let a = min(len - done, v.len - skip);
826                data[done..done + a].copy_from_slice(v.part(skip, a));
827                done += a;
828                if done == len {
829                    return;
830                }
831            }
832            u.read(start + done as u64, &mut data[done..]);
833        }
834    }
835}
836
837/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
838pub struct BasicAtomicFile {
839    /// The main underlying storage.
840    stg: WriteBuffer,
841    /// Temporary storage for updates during commit.
842    upd: WriteBuffer,
843    /// Map of writes.
844    map: WMap,
845    /// List of writes.
846    list: Vec<(u64, DataSlice)>,
847    size: u64,
848}
849
850impl BasicAtomicFile {
851    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
852    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
853        let size = stg.size();
854        let mut result = Box::new(Self {
855            stg: WriteBuffer::new(stg, lim.swbuf),
856            upd: WriteBuffer::new(upd, lim.uwbuf),
857            map: WMap::default(),
858            list: Vec::new(),
859            size,
860        });
861        result.init();
862        result
863    }
864
865    /// Apply outstanding updates.
866    fn init(&mut self) {
867        let end = self.upd.stg.read_u64(0);
868        let size = self.upd.stg.read_u64(8);
869        if end == 0 {
870            return;
871        }
872        assert!(end == self.upd.stg.size());
873        let mut pos = 16;
874        while pos < end {
875            let start = self.upd.stg.read_u64(pos);
876            pos += 8;
877            let len = self.upd.stg.read_u64(pos);
878            pos += 8;
879            let mut buf = vec![0; len as usize];
880            self.upd.stg.read(pos, &mut buf);
881            pos += len;
882            self.stg.write(start, &buf);
883        }
884        self.stg.commit(size);
885        self.upd.commit(0);
886    }
887
888    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
889    pub fn commit_phase(&mut self, size: u64, phase: u8) {
890        if self.map.is_empty() && self.list.is_empty() {
891            return;
892        }
893        if phase == 1 {
894            self.list = self.map.convert_to_vec();
895
896            // Write the updates to upd.
897            // First set the end position to zero.
898            self.upd.write_u64(0, 0);
899            self.upd.write_u64(8, size);
900            self.upd.commit(16); // Not clear if this is necessary.
901
902            // Write the update records.
903            let mut stg_written = false;
904            let mut pos: u64 = 16;
905            for (start, v) in self.list.iter() {
906                let (start, len, data) = (*start, v.len as u64, v.all());
907                if start >= self.size {
908                    // Writes beyond current stg size can be written directly.
909                    stg_written = true;
910                    self.stg.write(start, data);
911                } else {
912                    self.upd.write_u64(pos, start);
913                    pos += 8;
914                    self.upd.write_u64(pos, len);
915                    pos += 8;
916                    self.upd.write(pos, data);
917                    pos += len;
918                }
919            }
920            if stg_written {
921                self.stg.commit(size);
922            }
923            self.upd.commit(pos); // Not clear if this is necessary.
924
925            // Set the end position.
926            self.upd.write_u64(0, pos);
927            self.upd.write_u64(8, size);
928            self.upd.commit(pos);
929        } else {
930            for (start, v) in self.list.iter() {
931                if *start < self.size {
932                    // Writes beyond current stg size have already been written.
933                    self.stg.write(*start, v.all());
934                }
935            }
936            self.list.clear();
937            self.stg.commit(size);
938            self.upd.commit(0);
939        }
940    }
941}
942
943impl Storage for BasicAtomicFile {
944    fn commit(&mut self, size: u64) {
945        self.commit_phase(size, 1);
946        self.commit_phase(size, 2);
947        self.size = size;
948    }
949
950    fn size(&self) -> u64 {
951        self.size
952    }
953
954    fn read(&self, start: u64, data: &mut [u8]) {
955        self.map.read(start, data, &*self.stg.stg);
956    }
957
958    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
959        self.map.write(start, data, off, len);
960    }
961
962    fn write(&mut self, start: u64, data: &[u8]) {
963        let len = data.len();
964        let d = Arc::new(data.to_vec());
965        self.write_data(start, d, 0, len);
966    }
967}
968
969#[cfg(test)]
970/// Get amount of testing from environment variable TA.
971fn test_amount() -> usize {
972    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
973}
974
975#[test]
976fn test_atomic_file() {
977    use rand::Rng;
978    /* Idea of test is to check AtomicFile and MemFile behave the same */
979
980    let ta = test_amount();
981    println!(" Test amount={}", ta);
982
983    let mut rng = rand::thread_rng();
984
985    for _ in 0..100 {
986        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
987        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
988        let mut s2 = MemFile::new();
989
990        for _ in 0..1000 * ta {
991            let off: usize = rng.r#gen::<usize>() % 100;
992            let mut len = 1 + rng.r#gen::<usize>() % 20;
993            let w: bool = rng.r#gen();
994            if w {
995                let mut bytes = Vec::new();
996                while len > 0 {
997                    len -= 1;
998                    let b: u8 = rng.r#gen::<u8>();
999                    bytes.push(b);
1000                }
1001                s1.write(off as u64, &bytes);
1002                s2.write(off as u64, &bytes);
1003            } else {
1004                let mut b2 = vec![0; len];
1005                let mut b3 = vec![0; len];
1006                s1.read(off as u64, &mut b2);
1007                s2.read(off as u64, &mut b3);
1008                assert!(b2 == b3);
1009            }
1010            if rng.r#gen::<usize>() % 50 == 0 {
1011                s1.commit(200);
1012                s2.commit(200);
1013            }
1014        }
1015    }
1016}