Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5use std::cmp::min;
6use std::sync::{Arc, Mutex, RwLock};
7use rustc_hash::FxHashMap as HashMap;
8
9#[cfg(not(feature = "pstd"))]
10use std::collections::BTreeMap;
11#[cfg(feature = "pstd")]
12use pstd::collections::BTreeMap;
13
14/// ```Arc<Vec<u8>>```
15pub type Data = Arc<Vec<u8>>;
16
17/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
18/// Performs commit asyncronously.
19///
20/// #Example
21///
22/// ```
23/// use atom_file::{AtomicFile,DummyFile,MemFile,Storage};
24/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
25/// af.write( 0, &[1,2,3,4] );
26/// af.commit(4);
27/// af.wait_complete();
28/// ```
29///
30/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying 
31/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
32/// the updates to underlying storage have been applied.
33pub struct AtomicFile {
34    map: WMap,
35    cf: Arc<RwLock<CommitFile>>,
36    size: u64,
37    tx: std::sync::mpsc::Sender<(u64, WMap)>,
38    busy: Arc<Mutex<()>>,
39    map_lim: usize,
40}
41
42impl AtomicFile {
43    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
44    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
45        Self::new_with_limits(stg, upd, &Limits::default())
46    }
47
48    /// Construct Atomic file with specified limits.
49    pub fn new_with_limits(
50        stg: Box<dyn Storage>,
51        upd: Box<dyn Storage>,
52        lim: &Limits,
53    ) -> Box<Self> {
54        let size = stg.size();
55        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
56        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
57        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
58        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
59
60        // Start the thread which does save asyncronously.
61        let (cf1, busy1) = (cf.clone(), busy.clone());
62        std::thread::spawn(move || {
63            while let Ok((size, map)) = rx.recv() {
64                let _lock = busy1.lock();
65                baf.map = map;
66                baf.commit(size);
67                cf1.write().unwrap().done_one();
68            }
69        });
70        Box::new(Self {
71            map: WMap::default(),
72            cf,
73            size,
74            tx,
75            busy,
76            map_lim: lim.map_lim,
77        })
78    }
79}
80
81impl Storage for AtomicFile {
82    fn commit(&mut self, size: u64) {
83        self.size = size;
84        if self.map.is_empty() {
85            return;
86        }
87        if self.cf.read().unwrap().map.len() > self.map_lim {
88            self.wait_complete();
89        }
90        let map = std::mem::take(&mut self.map);
91        let cf = &mut *self.cf.write().unwrap();
92        cf.todo += 1;
93        map.to_storage(cf);
94        self.tx.send((size, map)).unwrap();
95    }
96
97    fn size(&self) -> u64 {
98        self.size
99    }
100
101    fn read(&self, start: u64, data: &mut [u8]) {
102        self.map.read(start, data, &*self.cf.read().unwrap());
103    }
104
105    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
106        self.map.write(start, data, off, len);
107    }
108
109    fn write(&mut self, start: u64, data: &[u8]) {
110        let len = data.len();
111        let d = Arc::new(data.to_vec());
112        self.write_data(start, d, 0, len);
113    }
114
115    fn wait_complete(&self) {
116        while self.cf.read().unwrap().todo != 0 {
117            #[cfg(feature = "log")]
118            println!("AtomicFile::wait_complete - waiting for writer process");
119            let _x = self.busy.lock();
120        }
121    }
122}
123
124/// Storage interface - Storage is some kind of "file" storage.
125///
126/// read and write methods take a start which is a byte offset in the underlying file.
127pub trait Storage: Send + Sync {
128    /// Get the size of the underlying storage.
129    /// Note : this is valid initially and after a commit but is not defined after write is called.
130    fn size(&self) -> u64;
131
132    /// Read data.
133    fn read(&self, start: u64, data: &mut [u8]);
134
135    /// Write byte slice to storage.
136    fn write(&mut self, start: u64, data: &[u8]);
137
138    /// Write byte Vec.
139    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
140        let len = data.len();
141        let d = Arc::new(data);
142        self.write_data(start, d, 0, len);
143    }
144
145    /// Write Data slice.
146    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
147        self.write(start, &data[off..off + len]);
148    }
149
150    /// Finish write transaction, size is new size of underlying storage.
151    fn commit(&mut self, size: u64);
152
153    /// Write u64.
154    fn write_u64(&mut self, start: u64, value: u64) {
155        self.write(start, &value.to_le_bytes());
156    }
157
158    /// Read u64.
159    fn read_u64(&self, start: u64) -> u64 {
160        let mut bytes = [0; 8];
161        self.read(start, &mut bytes);
162        u64::from_le_bytes(bytes)
163    }
164
165    /// Clone. note: provided method panics.
166    fn clone(&self) -> Box<dyn Storage> {
167        panic!()
168    }
169
170    /// Wait until current writes are complete.
171    fn wait_complete(&self) {}
172}
173
174/// Simple implementation of [Storage] using `Vec<u8>`.
175#[derive(Default)]
176pub struct MemFile {
177    v: Arc<Mutex<Vec<u8>>>,
178}
179
180impl MemFile {
181    /// Get a new (boxed) MemFile.
182    pub fn new() -> Box<Self> {
183        Box::<Self>::default()
184    }
185}
186
187impl Storage for MemFile {
188    fn size(&self) -> u64 {
189        let v = self.v.lock().unwrap();
190        v.len() as u64
191    }
192
193    fn read(&self, off: u64, bytes: &mut [u8]) {
194        let off = off as usize;
195        let len = bytes.len();
196        let mut v = self.v.lock().unwrap();
197        if off + len > v.len() {
198            v.resize(off + len, 0);
199        }
200        bytes.copy_from_slice(&v[off..off + len]);
201    }
202
203    fn write(&mut self, off: u64, bytes: &[u8]) {
204        let off = off as usize;
205        let len = bytes.len();
206        let mut v = self.v.lock().unwrap();
207        if off + len > v.len() {
208            v.resize(off + len, 0);
209        }
210        v[off..off + len].copy_from_slice(bytes);
211    }
212
213    fn commit(&mut self, size: u64) {
214        let mut v = self.v.lock().unwrap();
215        v.resize(size as usize, 0);
216    }
217
218    fn clone(&self) -> Box<dyn Storage> {
219        Box::new(Self { v: self.v.clone() })
220    }
221}
222
223use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
224
225/// Simple implementation of [Storage] using `std::fs::File`.
226pub struct SimpleFileStorage {
227    file: Arc<Mutex<fs::File>>,
228}
229
230impl SimpleFileStorage {
231    /// Construct from filename.
232    pub fn new(filename: &str) -> Box<Self> {
233        Box::new(Self {
234            file: Arc::new(Mutex::new(
235                OpenOptions::new()
236                    .read(true)
237                    .write(true)
238                    .create(true)
239                    .truncate(false)
240                    .open(filename)
241                    .unwrap(),
242            )),
243        })
244    }
245}
246
247impl Storage for SimpleFileStorage {
248    fn size(&self) -> u64 {
249        let mut f = self.file.lock().unwrap();
250        f.seek(SeekFrom::End(0)).unwrap()
251    }
252
253    fn read(&self, off: u64, bytes: &mut [u8]) {
254        let mut f = self.file.lock().unwrap();
255        f.seek(SeekFrom::Start(off)).unwrap();
256        let _ = f.read(bytes).unwrap();
257    }
258
259    fn write(&mut self, off: u64, bytes: &[u8]) {
260        let mut f = self.file.lock().unwrap();
261        // The list of operating systems which auto-zero is likely more than this...research is todo.
262        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
263        {
264            let size = f.seek(SeekFrom::End(0)).unwrap();
265            if off > size {
266                f.set_len(off).unwrap();
267                /*
268                let len = (off - size) as usize;
269                let zb = vec![0; len];
270                f.seek(SeekFrom::Start(size)).unwrap();
271                let _ = f.write(&zb).unwrap();
272                */
273            }
274        }
275        f.seek(SeekFrom::Start(off)).unwrap();
276        let _ = f.write(bytes).unwrap();
277    }
278
279    fn commit(&mut self, size: u64) {
280        let f = self.file.lock().unwrap();
281        f.set_len(size).unwrap();
282        f.sync_all().unwrap();
283    }
284
285    fn clone(&self) -> Box<dyn Storage> {
286        Box::new(Self {
287            file: self.file.clone(),
288        })
289    }
290}
291
292/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads/writes by different threads.
293#[allow(clippy::vec_box)]
294pub struct MultiFileStorage {
295    filename: String,
296    files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
297}
298
299impl MultiFileStorage {
300    /// Create new MultiFileStorage.
301    pub fn new(filename: &str) -> Box<Self> {
302        Box::new(Self {
303            filename: filename.to_string(),
304            files: Arc::new(Mutex::new(Vec::new())),
305        })
306    }
307
308    fn get_file(&self) -> Box<SimpleFileStorage> {
309        match self.files.lock().unwrap().pop() {
310            Some(f) => f,
311            _ => SimpleFileStorage::new(&self.filename),
312        }
313    }
314
315    fn put_file(&self, f: Box<SimpleFileStorage>) {
316        self.files.lock().unwrap().push(f);
317    }
318}
319
320impl Storage for MultiFileStorage {
321    fn size(&self) -> u64 {
322        let f = self.get_file();
323        let result = f.size();
324        self.put_file(f);
325        result
326    }
327
328    fn read(&self, off: u64, bytes: &mut [u8]) {
329        let f = self.get_file();
330        f.read(off, bytes);
331        self.put_file(f);
332    }
333
334    fn write(&mut self, off: u64, bytes: &[u8]) {
335        let mut f = self.get_file();
336        f.write(off, bytes);
337        self.put_file(f);
338    }
339
340    fn commit(&mut self, size: u64) {
341        let mut f = self.get_file();
342        f.commit(size);
343        self.put_file(f);
344    }
345
346    fn clone(&self) -> Box<dyn Storage> {
347        Box::new(Self {
348            filename: self.filename.clone(),
349            files: self.files.clone(),
350        })
351    }
352}
353
354/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
355pub struct DummyFile {}
356impl DummyFile {
357    /// Construct.
358    pub fn new() -> Box<Self> {
359        Box::new(Self {})
360    }
361}
362
363impl Storage for DummyFile {
364    fn size(&self) -> u64 {
365        0
366    }
367
368    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
369
370    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
371
372    fn commit(&mut self, _size: u64) {}
373
374    fn clone(&self) -> Box<dyn Storage> {
375        Self::new()
376    }
377}
378
379/// Memory configuration limits.
380#[non_exhaustive]
381pub struct Limits {
382    /// Limit on size of commit write map, default is 5000.
383    pub map_lim: usize,
384    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
385    pub rbuf_mem: usize,
386    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
387    pub swbuf: usize,
388    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
389    pub uwbuf: usize,
390}
391
392impl Default for Limits {
393    fn default() -> Self {
394        Self {
395            map_lim: 5000,
396            rbuf_mem: 0x200000,
397            swbuf: 0x100000,
398            uwbuf: 0x100000,
399        }
400    }
401}
402
403struct CommitFile {
404    stg: ReadBufStg<256>,
405    map: WMap,
406    todo: usize,
407}
408
409impl CommitFile {
410    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
411        Self {
412            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
413            map: WMap::default(),
414            todo: 0,
415        }
416    }
417
418    fn done_one(&mut self) {
419        self.todo -= 1;
420        if self.todo == 0 {
421            self.map = WMap::default();
422            self.stg.reset();
423        }
424    }
425}
426
427impl Storage for CommitFile {
428    fn commit(&mut self, _size: u64) {
429        panic!()
430    }
431
432    fn size(&self) -> u64 {
433        panic!()
434    }
435
436    fn read(&self, start: u64, data: &mut [u8]) {
437        self.map.read(start, data, &self.stg);
438    }
439
440    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
441        self.map.write(start, data, off, len);
442    }
443
444    fn write(&mut self, _start: u64, _data: &[u8]) {
445        panic!()
446    }
447}
448
449/// Write Buffer.
450struct WriteBuffer {
451    ix: usize,
452    pos: u64,
453    /// Underlying storage.
454    pub stg: Box<dyn Storage>,
455    buf: Vec<u8>,
456    #[cfg(feature = "log")]
457    log: Log,
458}
459
460#[cfg(feature = "log")]
461struct Log {
462    write: u64,
463    flush: u64,
464    total: u64,
465    first_flush_time: std::time::Instant,
466}
467
468impl WriteBuffer {
469    /// Construct.
470    pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
471        Self {
472            ix: 0,
473            pos: u64::MAX,
474            stg,
475            buf: vec![0; buf_size],
476            #[cfg(feature = "log")]
477            log: Log {
478                write: 0,
479                flush: 0,
480                total: 0,
481                first_flush_time: std::time::Instant::now(),
482            },
483        }
484    }
485
486    /// Write data to specified offset,
487    pub fn write(&mut self, off: u64, data: &[u8]) {
488        if self.pos + self.ix as u64 != off {
489            self.flush(off);
490        }
491        let mut done: usize = 0;
492        let mut todo: usize = data.len();
493        #[cfg(feature = "log")]
494        {
495            self.log.write += 1;
496            self.log.total += todo as u64;
497        }
498        while todo > 0 {
499            let mut n: usize = self.buf.len() - self.ix;
500            if n == 0 {
501                self.flush(off + done as u64);
502                n = self.buf.len();
503            }
504            if n > todo {
505                n = todo;
506            }
507            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
508            todo -= n;
509            done += n;
510            self.ix += n;
511        }
512    }
513
514    fn flush(&mut self, new_pos: u64) {
515        if self.ix > 0 {
516            self.stg.write(self.pos, &self.buf[0..self.ix]);
517            #[cfg(feature = "log")]
518            {
519                if self.log.flush == 0 {
520                    self.log.first_flush_time = std::time::Instant::now();
521                }
522                self.log.flush += 1;
523            }
524        }
525        self.ix = 0;
526        self.pos = new_pos;
527    }
528
529    /// Commit.
530    pub fn commit(&mut self, size: u64) {
531        self.flush(u64::MAX);
532        self.stg.commit(size);
533        #[cfg(feature = "log")]
534        {
535            if size > 0 {
536                println!(
537                    "WriteBuffer commit size={size} write={} flush={} total={} time(micros)={}",
538                    self.log.write,
539                    self.log.flush,
540                    self.log.total,
541                    self.log.first_flush_time.elapsed().as_micros()
542                );
543            }
544            self.log.write = 0;
545            self.log.flush = 0;
546            self.log.total = 0;
547        }
548    }
549
550    /// Write u64.
551    pub fn write_u64(&mut self, start: u64, value: u64) {
552        self.write(start, &value.to_le_bytes());
553    }
554}
555
556/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
557///
558/// See implementation of AtomicFile for how this is used in conjunction with WMap.
559///
560/// N is buffer size.
561struct ReadBufStg<const N: usize> {
562    stg: Box<dyn Storage>,
563    buf: Mutex<ReadBuffer<N>>,
564    limit: usize,
565}
566
567impl<const N: usize> Drop for ReadBufStg<N> {
568    fn drop(&mut self) {
569        self.reset();
570    }
571}
572
573impl<const N: usize> ReadBufStg<N> {
574    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
575    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
576        Self {
577            stg,
578            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
579            limit,
580        }
581    }
582
583    /// Clears the buffers.
584    fn reset(&mut self) {
585        self.buf.lock().unwrap().reset();
586    }
587}
588
589impl<const N: usize> Storage for ReadBufStg<N> {
590    /// Read data from storage.
591    fn read(&self, start: u64, data: &mut [u8]) {
592        if data.len() <= self.limit {
593            self.buf.lock().unwrap().read(&*self.stg, start, data);
594        } else {
595            self.stg.read(start, data);
596        }
597    }
598
599    /// Panics.
600    fn size(&self) -> u64 {
601        panic!()
602    }
603
604    /// Panics.
605    fn write(&mut self, _start: u64, _data: &[u8]) {
606        panic!();
607    }
608
609    /// Panics.
610    fn commit(&mut self, _size: u64) {
611        panic!();
612    }
613}
614
615struct ReadBuffer<const N: usize> {
616    map: HashMap<u64, Box<[u8; N]>>,
617    max_buf: usize,
618    reads: u64,
619}
620
621impl<const N: usize> ReadBuffer<N> {
622    fn new(max_buf: usize) -> Self {
623        Self {
624            map: HashMap::default(),
625            max_buf,
626            reads: 0,
627        }
628    }
629
630    fn reset(&mut self) {
631        #[cfg(feature = "log")]
632        println!(
633            "ReadBuffer reset entries={} reads={}",
634            self.map.len(),
635            self.reads
636        );
637        self.reads = 0;
638        self.map.clear();
639    }
640
641    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
642        let mut done = 0;
643        while done < data.len() {
644            let off = off + done as u64;
645            let sector = off / N as u64;
646            let disp = (off % N as u64) as usize;
647            let amount = min(data.len() - done, N - disp);
648
649            self.reads += 1;
650
651            let p = self.map.entry(sector).or_insert_with(|| {
652                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
653                stg.read(sector * N as u64, &mut *p);
654                p
655            });
656            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
657            done += amount;
658        }
659        if self.map.len() >= self.max_buf {
660            self.reset();
661        }
662    }
663}
664
665#[derive(Default)]
666/// Slice of Data to be written to storage.
667struct DataSlice {
668    /// Slice data.
669    pub data: Data,
670    /// Start of slice.
671    pub off: usize,
672    /// Length of slice.
673    pub len: usize,
674}
675
676impl DataSlice {
677    /// Get reference to the whole slice.
678    pub fn all(&self) -> &[u8] {
679        &self.data[self.off..self.off + self.len]
680    }
681    /// Get reference to part of slice.
682    pub fn part(&self, off: usize, len: usize) -> &[u8] {
683        &self.data[self.off + off..self.off + off + len]
684    }
685    /// Trim specified amount from start of slice.
686    pub fn trim(&mut self, trim: usize) {
687        self.off += trim;
688        self.len -= trim;
689    }
690    /// Take the data.
691    #[allow(dead_code)]
692    pub fn take(&mut self) -> Data {
693        std::mem::take(&mut self.data)
694    }
695}
696
697#[derive(Default)]
698/// Updateable storage based on some underlying storage.
699struct WMap {
700    /// Map of writes. Key is the end of the slice.
701    map: BTreeMap<u64, DataSlice>,
702}
703
704impl WMap {
705    /// Is the map empty?
706    pub fn is_empty(&self) -> bool {
707        self.map.is_empty()
708    }
709
710    /// Number of key-value pairs in the map.
711    pub fn len(&self) -> usize {
712        self.map.len()
713    }
714
715    /// Take the map and convert it to a Vec.
716    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
717        let map = std::mem::take(&mut self.map);
718        let mut result = Vec::with_capacity(map.len());
719        for (end, v) in map {
720            let start = end - v.len as u64;
721            result.push((start, v));
722        }
723        result
724    }
725
726    /// Write the map into storage.
727    pub fn to_storage(&self, stg: &mut dyn Storage) {
728        for (end, v) in self.map.iter() {
729            let start = end - v.len as u64;
730            stg.write_data(start, v.data.clone(), v.off, v.len);
731        }
732    }
733
734    #[cfg(not(feature = "pstd"))]
735    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
736    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
737        if len != 0 {
738            let (mut insert, mut remove) = (Vec::new(), Vec::new());
739            let end = start + len as u64;
740            for (ee, v) in self.map.range_mut(start + 1..) {
741                let ee = *ee;
742                let es = ee - v.len as u64; // Existing write Start.
743                if es >= end {
744                    // Existing write starts after end of new write, nothing to do.
745                    break;
746                } else if start <= es {
747                    if end < ee {
748                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
749                        v.trim((end - es) as usize);
750                        break;
751                    }
752                    // New write subsumes existing write entirely, remove existing write.
753                    remove.push(ee);
754                } else if end < ee {
755                    // New write starts in middle of existing write, ends before end of existing write,
756                    // put start of existing write in insert list, trim existing write.
757                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
758                    v.trim((end - es) as usize);
759                    break;
760                } else {
761                    // New write starts in middle of existing write, ends after existing write,
762                    // put start of existing write in insert list, remove existing write.
763                    insert.push((es, v.take(), v.off, (start - es) as usize));
764                    remove.push(ee);
765                }
766            }
767            for end in remove {
768                self.map.remove(&end);
769            }
770            for (start, data, off, len) in insert {
771                self.map
772                    .insert(start + len as u64, DataSlice { data, off, len });
773            }
774            self.map
775                .insert(start + len as u64, DataSlice { data, off, len });
776        }
777    }
778
779    #[cfg(feature = "pstd")]
780    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
781    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
782        if len != 0 {
783            let end = start + len as u64;
784            let mut c = self
785                .map
786                .lower_bound_mut(std::ops::Bound::Excluded(&start))
787                .with_mutable_key();
788            while let Some((eend, v)) = c.next() {
789                let ee = *eend;
790                let es = ee - v.len as u64; // Existing write Start.
791                if es >= end {
792                    // Existing write starts after end of new write, nothing to do.
793                    c.prev();
794                    break;
795                } else if start <= es {
796                    if end < ee {
797                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
798                        v.trim((end - es) as usize);
799                        c.prev();
800                        break;
801                    }
802                    // New write subsumes existing write entirely, remove existing write.
803                    c.remove_prev();
804                } else if end < ee {
805                    // New write starts in middle of existing write, ends before end of existing write,
806                    // trim existing write, insert start of existing write.
807                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
808                    v.trim((end - es) as usize);
809                    c.prev();
810                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
811                    break;
812                } else {
813                    // New write starts in middle of existing write, ends after existing write,
814                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
815                    v.len = (start - es) as usize;
816                    *eend = es + v.len as u64;
817                }
818            }
819            // Insert the new write.
820            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
821        }
822    }
823
824    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
825    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
826        let len = data.len();
827        if len != 0 {
828            let mut done = 0;
829            for (&end, v) in self.map.range(start + 1..) {
830                let es = end - v.len as u64; // Existing write Start.
831                let doff = start + done as u64;
832                if es > doff {
833                    // Read from underlying storage.
834                    let a = min(len - done, (es - doff) as usize);
835                    u.read(doff, &mut data[done..done + a]);
836                    done += a;
837                    if done == len {
838                        return;
839                    }
840                }
841                // Use existing write.
842                let skip = (start + done as u64 - es) as usize;
843                let a = min(len - done, v.len - skip);
844                data[done..done + a].copy_from_slice(v.part(skip, a));
845                done += a;
846                if done == len {
847                    return;
848                }
849            }
850            u.read(start + done as u64, &mut data[done..]);
851        }
852    }
853}
854
855/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
856pub struct BasicAtomicFile {
857    /// The main underlying storage.
858    stg: WriteBuffer,
859    /// Temporary storage for updates during commit.
860    upd: WriteBuffer,
861    /// Map of writes.
862    map: WMap,
863    /// List of writes.
864    list: Vec<(u64, DataSlice)>,
865    size: u64,
866}
867
868impl BasicAtomicFile {
869    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
870    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
871        let size = stg.size();
872        let mut result = Box::new(Self {
873            stg: WriteBuffer::new(stg, lim.swbuf),
874            upd: WriteBuffer::new(upd, lim.uwbuf),
875            map: WMap::default(),
876            list: Vec::new(),
877            size,
878        });
879        result.init();
880        result
881    }
882
883    /// Apply outstanding updates.
884    fn init(&mut self) {
885        let end = self.upd.stg.read_u64(0);
886        let size = self.upd.stg.read_u64(8);
887        if end == 0 {
888            return;
889        }
890        assert!(end == self.upd.stg.size());
891        let mut pos = 16;
892        while pos < end {
893            let start = self.upd.stg.read_u64(pos);
894            pos += 8;
895            let len = self.upd.stg.read_u64(pos);
896            pos += 8;
897            let mut buf = vec![0; len as usize];
898            self.upd.stg.read(pos, &mut buf);
899            pos += len;
900            self.stg.write(start, &buf);
901        }
902        self.stg.commit(size);
903        self.upd.commit(0);
904    }
905
906    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
907    pub fn commit_phase(&mut self, size: u64, phase: u8) {
908        if self.map.is_empty() && self.list.is_empty() {
909            return;
910        }
911        if phase == 1 {
912            self.list = self.map.convert_to_vec();
913
914            // Write the updates to upd.
915            // First set the end position to zero.
916            self.upd.write_u64(0, 0);
917            self.upd.write_u64(8, size);
918            self.upd.commit(16); // Not clear if this is necessary.
919
920            // Write the update records.
921            let mut stg_written = false;
922            let mut pos: u64 = 16;
923            for (start, v) in self.list.iter() {
924                let (start, len, data) = (*start, v.len as u64, v.all());
925                if start >= self.size {
926                    // Writes beyond current stg size can be written directly.
927                    stg_written = true;
928                    self.stg.write(start, data);
929                } else {
930                    self.upd.write_u64(pos, start);
931                    pos += 8;
932                    self.upd.write_u64(pos, len);
933                    pos += 8;
934                    self.upd.write(pos, data);
935                    pos += len;
936                }
937            }
938            if stg_written {
939                self.stg.commit(size);
940            }
941            self.upd.commit(pos); // Not clear if this is necessary.
942
943            // Set the end position.
944            self.upd.write_u64(0, pos);
945            self.upd.write_u64(8, size);
946            self.upd.commit(pos);
947        } else {
948            for (start, v) in self.list.iter() {
949                if *start < self.size {
950                    // Writes beyond current stg size have already been written.
951                    self.stg.write(*start, v.all());
952                }
953            }
954            self.list.clear();
955            self.stg.commit(size);
956            self.upd.commit(0);
957        }
958    }
959}
960
961impl Storage for BasicAtomicFile {
962    fn commit(&mut self, size: u64) {
963        self.commit_phase(size, 1);
964        self.commit_phase(size, 2);
965        self.size = size;
966    }
967
968    fn size(&self) -> u64 {
969        self.size
970    }
971
972    fn read(&self, start: u64, data: &mut [u8]) {
973        self.map.read(start, data, &*self.stg.stg);
974    }
975
976    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
977        self.map.write(start, data, off, len);
978    }
979
980    fn write(&mut self, start: u64, data: &[u8]) {
981        let len = data.len();
982        let d = Arc::new(data.to_vec());
983        self.write_data(start, d, 0, len);
984    }
985}
986
987#[cfg(test)]
988/// Get amount of testing from environment variable TA.
989fn test_amount() -> usize {
990    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
991}
992
993#[test]
994fn test_atomic_file() {
995    use rand::Rng;
996    /* Idea of test is to check AtomicFile and MemFile behave the same */
997
998    let ta = test_amount();
999    println!(" Test amount={}", ta);
1000
1001    let mut rng = rand::thread_rng();
1002
1003    for _ in 0..100 {
1004        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1005        let mut s2 = MemFile::new();
1006
1007        for _ in 0..1000 * ta {
1008            let off: usize = rng.r#gen::<usize>() % 100;
1009            let mut len = 1 + rng.r#gen::<usize>() % 20;
1010            let w: bool = rng.r#gen();
1011            if w {
1012                let mut bytes = Vec::new();
1013                while len > 0 {
1014                    len -= 1;
1015                    let b: u8 = rng.r#gen::<u8>();
1016                    bytes.push(b);
1017                }
1018                s1.write(off as u64, &bytes);
1019                s2.write(off as u64, &bytes);
1020            } else {
1021                let mut b2 = vec![0; len];
1022                let mut b3 = vec![0; len];
1023                s1.read(off as u64, &mut b2);
1024                s2.read(off as u64, &mut b3);
1025                assert!(b2 == b3);
1026            }
1027        }
1028        s1.commit(200);
1029        s2.commit(200);
1030    }
1031}