Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#[cfg(feature = "pstd")]
6use pstd::collections::BTreeMap;
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10
11#[cfg(not(feature = "pstd"))]
12use std::collections::BTreeMap;
13
14/// ```Arc<Vec<u8>>```
15pub type Data = Arc<Vec<u8>>;
16
17/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
18/// Performs commit asyncronously.
19///
20/// #Example
21///
22/// ```
23/// use atom_file::{AtomicFile,DummyFile,MemFile,Storage};
24/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
25/// af.write( 0, &[1,2,3,4] );
26/// af.commit(4);
27/// af.wait_complete();
28/// ```
29pub struct AtomicFile {
30    map: WMap,
31    cf: Arc<RwLock<CommitFile>>,
32    size: u64,
33    tx: std::sync::mpsc::Sender<(u64, WMap)>,
34    busy: Arc<Mutex<()>>,
35    map_lim: usize,
36}
37
38impl AtomicFile {
39    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
40    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
41        Self::new_with_limits(stg, upd, &Limits::default())
42    }
43
44    /// Construct Atomic file with specified limits.
45    pub fn new_with_limits(
46        stg: Box<dyn Storage>,
47        upd: Box<dyn Storage>,
48        lim: &Limits,
49    ) -> Box<Self> {
50        let size = stg.size();
51        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
52        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
53        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
54        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
55
56        // Start the thread which does save asyncronously.
57        let (cf1, busy1) = (cf.clone(), busy.clone());
58        std::thread::spawn(move || {
59            while let Ok((size, map)) = rx.recv() {
60                let _lock = busy1.lock();
61                baf.map = map;
62                baf.commit(size);
63                cf1.write().unwrap().done_one();
64            }
65        });
66        Box::new(Self {
67            map: WMap::default(),
68            cf,
69            size,
70            tx,
71            busy,
72            map_lim: lim.map_lim,
73        })
74    }
75}
76
77impl Storage for AtomicFile {
78    fn commit(&mut self, size: u64) {
79        self.size = size;
80        if self.map.is_empty() {
81            return;
82        }
83        if self.cf.read().unwrap().map.len() > self.map_lim {
84            self.wait_complete();
85        }
86        let map = std::mem::take(&mut self.map);
87        let cf = &mut *self.cf.write().unwrap();
88        cf.todo += 1;
89        map.to_storage(cf);
90        self.tx.send((size, map)).unwrap();
91    }
92
93    fn size(&self) -> u64 {
94        self.size
95    }
96
97    fn read(&self, start: u64, data: &mut [u8]) {
98        self.map.read(start, data, &*self.cf.read().unwrap());
99    }
100
101    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
102        self.map.write(start, data, off, len);
103    }
104
105    fn write(&mut self, start: u64, data: &[u8]) {
106        let len = data.len();
107        let d = Arc::new(data.to_vec());
108        self.write_data(start, d, 0, len);
109    }
110
111    fn wait_complete(&self) {
112        while self.cf.read().unwrap().todo != 0 {
113            #[cfg(feature = "log")]
114            println!("AtomicFile::wait_complete - waiting for writer process");
115            let _x = self.busy.lock();
116        }
117    }
118}
119
120/// Storage interface - Storage is some kind of "file" storage.
121///
122/// read and write methods take a start which is a byte offset in the underlying file.
123pub trait Storage: Send + Sync {
124    /// Get the size of the underlying storage.
125    /// Note : this is valid initially and after a commit but is not defined after write is called.
126    fn size(&self) -> u64;
127
128    /// Read data.
129    fn read(&self, start: u64, data: &mut [u8]);
130
131    /// Write byte slice to storage.
132    fn write(&mut self, start: u64, data: &[u8]);
133
134    /// Write byte Vec.
135    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
136        let len = data.len();
137        let d = Arc::new(data);
138        self.write_data(start, d, 0, len);
139    }
140
141    /// Write Data slice.
142    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
143        self.write(start, &data[off..off + len]);
144    }
145
146    /// Finish write transaction, size is new size of underlying storage.
147    fn commit(&mut self, size: u64);
148
149    /// Write u64.
150    fn write_u64(&mut self, start: u64, value: u64) {
151        self.write(start, &value.to_le_bytes());
152    }
153
154    /// Read u64.
155    fn read_u64(&self, start: u64) -> u64 {
156        let mut bytes = [0; 8];
157        self.read(start, &mut bytes);
158        u64::from_le_bytes(bytes)
159    }
160
161    /// Clone. note: provided method panics.
162    fn clone(&self) -> Box<dyn Storage> {
163        panic!()
164    }
165
166    /// Wait until current writes are complete.
167    fn wait_complete(&self) {}
168}
169
170/// Simple implementation of [Storage] using `Vec<u8>`.
171#[derive(Default)]
172pub struct MemFile {
173    v: Arc<Mutex<Vec<u8>>>,
174}
175
176impl MemFile {
177    /// Get a new (boxed) MemFile.
178    pub fn new() -> Box<Self> {
179        Box::<Self>::default()
180    }
181}
182
183impl Storage for MemFile {
184    fn size(&self) -> u64 {
185        let v = self.v.lock().unwrap();
186        v.len() as u64
187    }
188
189    fn read(&self, off: u64, bytes: &mut [u8]) {
190        let off = off as usize;
191        let len = bytes.len();
192        let mut v = self.v.lock().unwrap();
193        if off + len > v.len() {
194            v.resize(off + len, 0);
195        }
196        bytes.copy_from_slice(&v[off..off + len]);
197    }
198
199    fn write(&mut self, off: u64, bytes: &[u8]) {
200        let off = off as usize;
201        let len = bytes.len();
202        let mut v = self.v.lock().unwrap();
203        if off + len > v.len() {
204            v.resize(off + len, 0);
205        }
206        v[off..off + len].copy_from_slice(bytes);
207    }
208
209    fn commit(&mut self, size: u64) {
210        let mut v = self.v.lock().unwrap();
211        v.resize(size as usize, 0);
212    }
213
214    fn clone(&self) -> Box<dyn Storage> {
215        Box::new(Self { v: self.v.clone() })
216    }
217}
218
219use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
220
221/// Simple implementation of [Storage] using `std::fs::File`.
222pub struct SimpleFileStorage {
223    file: Arc<Mutex<fs::File>>,
224}
225
226impl SimpleFileStorage {
227    /// Construct from filename.
228    pub fn new(filename: &str) -> Box<Self> {
229        Box::new(Self {
230            file: Arc::new(Mutex::new(
231                OpenOptions::new()
232                    .read(true)
233                    .write(true)
234                    .create(true)
235                    .truncate(false)
236                    .open(filename)
237                    .unwrap(),
238            )),
239        })
240    }
241}
242
243impl Storage for SimpleFileStorage {
244    fn size(&self) -> u64 {
245        let mut f = self.file.lock().unwrap();
246        f.seek(SeekFrom::End(0)).unwrap()
247    }
248
249    fn read(&self, off: u64, bytes: &mut [u8]) {
250        let mut f = self.file.lock().unwrap();
251        f.seek(SeekFrom::Start(off)).unwrap();
252        let _ = f.read(bytes).unwrap();
253    }
254
255    fn write(&mut self, off: u64, bytes: &[u8]) {
256        let mut f = self.file.lock().unwrap();
257        // The list of operating systems which auto-zero is likely more than this...research is todo.
258        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
259        {
260            let size = f.seek(SeekFrom::End(0)).unwrap();
261            if off > size {
262                f.set_len(off).unwrap();
263                /*
264                let len = (off - size) as usize;
265                let zb = vec![0; len];
266                f.seek(SeekFrom::Start(size)).unwrap();
267                let _ = f.write(&zb).unwrap();
268                */
269            }
270        }
271        f.seek(SeekFrom::Start(off)).unwrap();
272        let _ = f.write(bytes).unwrap();
273    }
274
275    fn commit(&mut self, size: u64) {
276        let f = self.file.lock().unwrap();
277        f.set_len(size).unwrap();
278        f.sync_all().unwrap();
279    }
280
281    fn clone(&self) -> Box<dyn Storage> {
282        Box::new(Self {
283            file: self.file.clone(),
284        })
285    }
286}
287
288/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads/writes by different threads.
289#[allow(clippy::vec_box)]
290pub struct MultiFileStorage {
291    filename: String,
292    files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
293}
294
295impl MultiFileStorage {
296    /// Create new MultiFileStorage.
297    pub fn new(filename: &str) -> Box<Self> {
298        Box::new(Self {
299            filename: filename.to_string(),
300            files: Arc::new(Mutex::new(Vec::new())),
301        })
302    }
303
304    fn get_file(&self) -> Box<SimpleFileStorage> {
305        match self.files.lock().unwrap().pop() {
306            Some(f) => f,
307            _ => SimpleFileStorage::new(&self.filename),
308        }
309    }
310
311    fn put_file(&self, f: Box<SimpleFileStorage>) {
312        self.files.lock().unwrap().push(f);
313    }
314}
315
316impl Storage for MultiFileStorage {
317    fn size(&self) -> u64 {
318        let f = self.get_file();
319        let result = f.size();
320        self.put_file(f);
321        result
322    }
323
324    fn read(&self, off: u64, bytes: &mut [u8]) {
325        let f = self.get_file();
326        f.read(off, bytes);
327        self.put_file(f);
328    }
329
330    fn write(&mut self, off: u64, bytes: &[u8]) {
331        let mut f = self.get_file();
332        f.write(off, bytes);
333        self.put_file(f);
334    }
335
336    fn commit(&mut self, size: u64) {
337        let mut f = self.get_file();
338        f.commit(size);
339        self.put_file(f);
340    }
341
342    fn clone(&self) -> Box<dyn Storage> {
343        Box::new(Self {
344            filename: self.filename.clone(),
345            files: self.files.clone(),
346        })
347    }
348}
349
350/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
351pub struct DummyFile {}
352impl DummyFile {
353    /// Construct.
354    pub fn new() -> Box<Self> {
355        Box::new(Self {})
356    }
357}
358
359impl Storage for DummyFile {
360    fn size(&self) -> u64 {
361        0
362    }
363
364    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
365
366    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
367
368    fn commit(&mut self, _size: u64) {}
369
370    fn clone(&self) -> Box<dyn Storage> {
371        Self::new()
372    }
373}
374
375/// Memory configuration limits.
376#[non_exhaustive]
377pub struct Limits {
378    /// Limit on size of commit write map, default is 5000.
379    pub map_lim: usize,
380    /// Memory for buffering small reads, default is 0x200000.
381    pub rbuf_mem: usize,
382    /// Memory for buffering writes to main storage,, default is 0x100000.
383    pub swbuf: usize,
384    /// Memory for buffering writes to temporary storage, default is 0x100000.
385    pub uwbuf: usize,
386}
387
388impl Default for Limits {
389    fn default() -> Self {
390        Self {
391            map_lim: 5000,
392            rbuf_mem: 0x200000,
393            swbuf: 0x100000,
394            uwbuf: 0x100000,
395        }
396    }
397}
398
399struct CommitFile {
400    stg: ReadBufStg<256>,
401    map: WMap,
402    todo: usize,
403}
404
405impl CommitFile {
406    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
407        Self {
408            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
409            map: WMap::default(),
410            todo: 0,
411        }
412    }
413
414    fn done_one(&mut self) {
415        self.todo -= 1;
416        if self.todo == 0 {
417            self.map = WMap::default();
418            self.stg.reset();
419        }
420    }
421}
422
423impl Storage for CommitFile {
424    fn commit(&mut self, _size: u64) {
425        panic!()
426    }
427
428    fn size(&self) -> u64 {
429        panic!()
430    }
431
432    fn read(&self, start: u64, data: &mut [u8]) {
433        self.map.read(start, data, &self.stg);
434    }
435
436    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
437        self.map.write(start, data, off, len);
438    }
439
440    fn write(&mut self, _start: u64, _data: &[u8]) {
441        panic!()
442    }
443}
444
445/// Write Buffer.
446struct WriteBuffer {
447    ix: usize,
448    pos: u64,
449    /// Underlying storage.
450    pub stg: Box<dyn Storage>,
451    buf: Vec<u8>,
452    #[cfg(feature = "log")]
453    log: Log,
454}
455
456#[cfg(feature = "log")]
457struct Log {
458    write: u64,
459    flush: u64,
460    total: u64,
461    first_flush_time: std::time::Instant,
462}
463
464impl WriteBuffer {
465    /// Construct.
466    pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
467        Self {
468            ix: 0,
469            pos: u64::MAX,
470            stg,
471            buf: vec![0; buf_size],
472            #[cfg(feature = "log")]
473            log: Log {
474                write: 0,
475                flush: 0,
476                total: 0,
477                first_flush_time: std::time::Instant::now(),
478            },
479        }
480    }
481
482    /// Write data to specified offset,
483    pub fn write(&mut self, off: u64, data: &[u8]) {
484        if self.pos + self.ix as u64 != off {
485            self.flush(off);
486        }
487        let mut done: usize = 0;
488        let mut todo: usize = data.len();
489        #[cfg(feature = "log")]
490        {
491            self.log.write += 1;
492            self.log.total += todo as u64;
493        }
494        while todo > 0 {
495            let mut n: usize = self.buf.len() - self.ix;
496            if n == 0 {
497                self.flush(off + done as u64);
498                n = self.buf.len();
499            }
500            if n > todo {
501                n = todo;
502            }
503            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
504            todo -= n;
505            done += n;
506            self.ix += n;
507        }
508    }
509
510    fn flush(&mut self, new_pos: u64) {
511        if self.ix > 0 {
512            self.stg.write(self.pos, &self.buf[0..self.ix]);
513            #[cfg(feature = "log")]
514            {
515                if self.log.flush == 0 {
516                    self.log.first_flush_time = std::time::Instant::now();
517                }
518                self.log.flush += 1;
519            }
520        }
521        self.ix = 0;
522        self.pos = new_pos;
523    }
524
525    /// Commit.
526    pub fn commit(&mut self, size: u64) {
527        self.flush(u64::MAX);
528        self.stg.commit(size);
529        #[cfg(feature = "log")]
530        {
531            if size > 0 {
532                println!(
533                    "WriteBuffer commit size={size} write={} flush={} total={} time(micros)={}",
534                    self.log.write,
535                    self.log.flush,
536                    self.log.total,
537                    self.log.first_flush_time.elapsed().as_micros()
538                );
539            }
540            self.log.write = 0;
541            self.log.flush = 0;
542            self.log.total = 0;
543        }
544    }
545
546    /// Write u64.
547    pub fn write_u64(&mut self, start: u64, value: u64) {
548        self.write(start, &value.to_le_bytes());
549    }
550}
551
552/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
553///
554/// See implementation of AtomicFile for how this is used in conjunction with WMap.
555///
556/// N is buffer size.
557struct ReadBufStg<const N: usize> {
558    stg: Box<dyn Storage>,
559    buf: Mutex<ReadBuffer<N>>,
560    limit: usize,
561}
562
563impl<const N: usize> Drop for ReadBufStg<N> {
564    fn drop(&mut self) {
565        self.reset();
566    }
567}
568
569impl<const N: usize> ReadBufStg<N> {
570    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
571    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
572        Self {
573            stg,
574            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
575            limit,
576        }
577    }
578
579    /// Clears the buffers.
580    fn reset(&mut self) {
581        self.buf.lock().unwrap().reset();
582    }
583}
584
585impl<const N: usize> Storage for ReadBufStg<N> {
586    /// Read data from storage.
587    fn read(&self, start: u64, data: &mut [u8]) {
588        if data.len() <= self.limit {
589            self.buf.lock().unwrap().read(&*self.stg, start, data);
590        } else {
591            self.stg.read(start, data);
592        }
593    }
594
595    /// Panics.
596    fn size(&self) -> u64 {
597        panic!()
598    }
599
600    /// Panics.
601    fn write(&mut self, _start: u64, _data: &[u8]) {
602        panic!();
603    }
604
605    /// Panics.
606    fn commit(&mut self, _size: u64) {
607        panic!();
608    }
609}
610
611struct ReadBuffer<const N: usize> {
612    map: HashMap<u64, Box<[u8; N]>>,
613    max_buf: usize,
614    reads: u64,
615}
616
617impl<const N: usize> ReadBuffer<N> {
618    fn new(max_buf: usize) -> Self {
619        Self {
620            map: HashMap::default(),
621            max_buf,
622            reads: 0,
623        }
624    }
625
626    fn reset(&mut self) {
627        #[cfg(feature = "log")]
628        println!(
629            "ReadBuffer reset entries={} reads={}",
630            self.map.len(),
631            self.reads
632        );
633        self.reads = 0;
634        self.map.clear();
635    }
636
637    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
638        let mut done = 0;
639        while done < data.len() {
640            let off = off + done as u64;
641            let sector = off / N as u64;
642            let disp = (off % N as u64) as usize;
643            let amount = min(data.len() - done, N - disp);
644
645            self.reads += 1;
646
647            let p = self.map.entry(sector).or_insert_with(|| {
648                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
649                stg.read(sector * N as u64, &mut *p);
650                p
651            });
652            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
653            done += amount;
654        }
655        if self.map.len() >= self.max_buf {
656            self.reset();
657        }
658    }
659}
660
661#[derive(Default)]
662/// Slice of Data to be written to storage.
663struct DataSlice {
664    /// Slice data.
665    pub data: Data,
666    /// Start of slice.
667    pub off: usize,
668    /// Length of slice.
669    pub len: usize,
670}
671
672impl DataSlice {
673    /// Get reference to the whole slice.
674    pub fn all(&self) -> &[u8] {
675        &self.data[self.off..self.off + self.len]
676    }
677    /// Get reference to part of slice.
678    pub fn part(&self, off: usize, len: usize) -> &[u8] {
679        &self.data[self.off + off..self.off + off + len]
680    }
681    /// Trim specified amount from start of slice.
682    pub fn trim(&mut self, trim: usize) {
683        self.off += trim;
684        self.len -= trim;
685    }
686    /// Take the data.
687    pub fn take(&mut self) -> Data {
688        std::mem::take(&mut self.data)
689    }
690}
691
692#[derive(Default)]
693/// Updateable storage based on some underlying storage.
694struct WMap {
695    /// Map of writes. Key is the end of the slice.
696    map: BTreeMap<u64, DataSlice>,
697}
698
699impl WMap {
700    /// Is the map empty?
701    pub fn is_empty(&self) -> bool {
702        self.map.is_empty()
703    }
704
705    /// Number of key-value pairs in the map.
706    pub fn len(&self) -> usize {
707        self.map.len()
708    }
709
710    /// Take the map and convert it to a Vec.
711    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
712        let map = std::mem::take(&mut self.map);
713        let mut result = Vec::with_capacity(map.len());
714        for (end, v) in map {
715            let start = end - v.len as u64;
716            result.push((start, v));
717        }
718        result
719    }
720
721    /// Write the map into storage.
722    pub fn to_storage(&self, stg: &mut dyn Storage) {
723        for (end, v) in self.map.iter() {
724            let start = end - v.len as u64;
725            stg.write_data(start, v.data.clone(), v.off, v.len);
726        }
727    }
728
729    #[cfg(not(feature = "pstd"))]
730    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
731    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
732        if len != 0 {
733            let (mut insert, mut remove) = (Vec::new(), Vec::new());
734            let end = start + len as u64;
735            for (ee, v) in self.map.range_mut(start + 1..) {
736                let ee = *ee;
737                let es = ee - v.len as u64; // Existing write Start.
738                if es >= end {
739                    // Existing write starts after end of new write, nothing to do.
740                    break;
741                } else if start <= es {
742                    if end < ee {
743                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
744                        v.trim((end - es) as usize);
745                        break;
746                    }
747                    // New write subsumes existing write entirely, remove existing write.
748                    remove.push(ee);
749                } else if end < ee {
750                    // New write starts in middle of existing write, ends before end of existing write,
751                    // put start of existing write in insert list, trim existing write.
752                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
753                    v.trim((end - es) as usize);
754                    break;
755                } else {
756                    // New write starts in middle of existing write, ends after existing write,
757                    // put start of existing write in insert list, remove existing write.
758                    insert.push((es, v.take(), v.off, (start - es) as usize));
759                    remove.push(ee);
760                }
761            }
762            for end in remove {
763                self.map.remove(&end);
764            }
765            for (start, data, off, len) in insert {
766                self.map
767                    .insert(start + len as u64, DataSlice { data, off, len });
768            }
769            self.map
770                .insert(start + len as u64, DataSlice { data, off, len });
771        }
772    }
773
774    #[cfg(feature = "pstd")]
775    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
776    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
777        if len != 0 {
778            let end = start + len as u64;
779            let mut c = self
780                .map
781                .lower_bound_mut(std::ops::Bound::Excluded(&start))
782                .with_mutable_key();
783            while let Some((eend, v)) = c.next() {
784                let ee = *eend;
785                let es = ee - v.len as u64; // Existing write Start.
786                if es >= end {
787                    // Existing write starts after end of new write, nothing to do.
788                    c.prev();
789                    break;
790                } else if start <= es {
791                    if end < ee {
792                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
793                        v.trim((end - es) as usize);
794                        c.prev();
795                        break;
796                    }
797                    // New write subsumes existing write entirely, remove existing write.
798                    c.remove_prev();
799                } else if end < ee {
800                    // New write starts in middle of existing write, ends before end of existing write,
801                    // trim existing write, insert start of existing write.
802                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
803                    v.trim((end - es) as usize);
804                    c.prev();
805                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
806                    break;
807                } else {
808                    // New write starts in middle of existing write, ends after existing write,
809                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
810                    v.len = (start - es) as usize;
811                    *eend = es + v.len as u64;
812                }
813            }
814            // Insert the new write.
815            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
816        }
817    }
818
819    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
820    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
821        let len = data.len();
822        if len != 0 {
823            let mut done = 0;
824            for (&end, v) in self.map.range(start + 1..) {
825                let es = end - v.len as u64; // Existing write Start.
826                let doff = start + done as u64;
827                if es > doff {
828                    // Read from underlying storage.
829                    let a = min(len - done, (es - doff) as usize);
830                    u.read(doff, &mut data[done..done + a]);
831                    done += a;
832                    if done == len {
833                        return;
834                    }
835                }
836                // Use existing write.
837                let skip = (start + done as u64 - es) as usize;
838                let a = min(len - done, v.len - skip);
839                data[done..done + a].copy_from_slice(v.part(skip, a));
840                done += a;
841                if done == len {
842                    return;
843                }
844            }
845            u.read(start + done as u64, &mut data[done..]);
846        }
847    }
848}
849
850/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of reads and writes.
851pub struct BasicAtomicFile {
852    /// The main underlying storage.
853    stg: WriteBuffer,
854    /// Temporary storage for updates during commit.
855    upd: WriteBuffer,
856    /// Map of writes.
857    map: WMap,
858    /// List of writes.
859    list: Vec<(u64, DataSlice)>,
860    size: u64,
861}
862
863impl BasicAtomicFile {
864    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
865    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
866        let size = stg.size();
867        let mut result = Box::new(Self {
868            stg: WriteBuffer::new(stg, lim.swbuf),
869            upd: WriteBuffer::new(upd, lim.uwbuf),
870            map: WMap::default(),
871            list: Vec::new(),
872            size,
873        });
874        result.init();
875        result
876    }
877
878    /// Apply outstanding updates.
879    fn init(&mut self) {
880        let end = self.upd.stg.read_u64(0);
881        let size = self.upd.stg.read_u64(8);
882        if end == 0 {
883            return;
884        }
885        assert!(end == self.upd.stg.size());
886        let mut pos = 16;
887        while pos < end {
888            let start = self.upd.stg.read_u64(pos);
889            pos += 8;
890            let len = self.upd.stg.read_u64(pos);
891            pos += 8;
892            let mut buf = vec![0; len as usize];
893            self.upd.stg.read(pos, &mut buf);
894            pos += len;
895            self.stg.write(start, &buf);
896        }
897        self.stg.commit(size);
898        self.upd.commit(0);
899    }
900
901    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
902    pub fn commit_phase(&mut self, size: u64, phase: u8) {
903        if self.map.is_empty() && self.list.is_empty() {
904            return;
905        }
906        if phase == 1 {
907            self.list = self.map.convert_to_vec();
908
909            // Write the updates to upd.
910            // First set the end position to zero.
911            self.upd.write_u64(0, 0);
912            self.upd.write_u64(8, size);
913            self.upd.commit(16); // Not clear if this is necessary.
914
915            // Write the update records.
916            let mut stg_written = false;
917            let mut pos: u64 = 16;
918            for (start, v) in self.list.iter() {
919                let (start, len, data) = (*start, v.len as u64, v.all());
920                if start >= self.size {
921                    // Writes beyond current stg size can be written directly.
922                    stg_written = true;
923                    self.stg.write(start, data);
924                } else {
925                    self.upd.write_u64(pos, start);
926                    pos += 8;
927                    self.upd.write_u64(pos, len);
928                    pos += 8;
929                    self.upd.write(pos, data);
930                    pos += len;
931                }
932            }
933            if stg_written {
934                self.stg.commit(size);
935            }
936            self.upd.commit(pos); // Not clear if this is necessary.
937
938            // Set the end position.
939            self.upd.write_u64(0, pos);
940            self.upd.write_u64(8, size);
941            self.upd.commit(pos);
942        } else {
943            for (start, v) in self.list.iter() {
944                if *start < self.size {
945                    // Writes beyond current stg size have already been written.
946                    self.stg.write(*start, v.all());
947                }
948            }
949            self.list.clear();
950            self.stg.commit(size);
951            self.upd.commit(0);
952        }
953    }
954}
955
956impl Storage for BasicAtomicFile {
957    fn commit(&mut self, size: u64) {
958        self.commit_phase(size, 1);
959        self.commit_phase(size, 2);
960        self.size = size;
961    }
962
963    fn size(&self) -> u64 {
964        self.size
965    }
966
967    fn read(&self, start: u64, data: &mut [u8]) {
968        self.map.read(start, data, &*self.stg.stg);
969    }
970
971    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
972        self.map.write(start, data, off, len);
973    }
974
975    fn write(&mut self, start: u64, data: &[u8]) {
976        let len = data.len();
977        let d = Arc::new(data.to_vec());
978        self.write_data(start, d, 0, len);
979    }
980}
981
982#[cfg(test)]
983/// Get amount of testing from environment variable TA.
984fn test_amount() -> usize {
985    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
986}
987
988#[test]
989fn test_atomic_file() {
990    use rand::Rng;
991    /* Idea of test is to check AtomicFile and MemFile behave the same */
992
993    let ta = test_amount();
994
995    let mut rng = rand::thread_rng();
996
997    for _ in 0..100 {
998        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
999        let mut s2 = MemFile::new();
1000
1001        for _ in 0..1000 * ta {
1002            let off: usize = rng.r#gen::<usize>() % 100;
1003            let mut len = 1 + rng.r#gen::<usize>() % 20;
1004            let w: bool = rng.r#gen();
1005            if w {
1006                let mut bytes = Vec::new();
1007                while len > 0 {
1008                    len -= 1;
1009                    let b: u8 = rng.r#gen::<u8>();
1010                    bytes.push(b);
1011                }
1012                s1.write(off as u64, &bytes);
1013                s2.write(off as u64, &bytes);
1014            } else {
1015                let mut b2 = vec![0; len];
1016                let mut b3 = vec![0; len];
1017                s1.read(off as u64, &mut b2);
1018                s2.read(off as u64, &mut b3);
1019                assert!(b2 == b3);
1020            }
1021        }
1022    }
1023}