Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#[cfg(feature = "pstd")]
6use pstd::collections::BTreeMap;
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10
11#[cfg(not(feature = "pstd"))]
12use std::collections::BTreeMap;
13
14/// ```Arc<Vec<u8>>```
15pub type Data = Arc<Vec<u8>>;
16
17/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
18/// Performs commit asyncronously.
19///
20/// #Example
21///
22/// ```
23/// use atom_file::{AtomicFile,DummyFile,MemFile,Storage};
24/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
25/// af.write( 0, &[1,2,3,4] );
26/// af.commit(4);
27/// af.wait_complete();
28/// ```
29///
30/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying 
31/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
32/// the updates to underlying storage have been applied.
33pub struct AtomicFile {
34    map: WMap,
35    cf: Arc<RwLock<CommitFile>>,
36    size: u64,
37    tx: std::sync::mpsc::Sender<(u64, WMap)>,
38    busy: Arc<Mutex<()>>,
39    map_lim: usize,
40}
41
42impl AtomicFile {
43    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
44    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
45        Self::new_with_limits(stg, upd, &Limits::default())
46    }
47
48    /// Construct Atomic file with specified limits.
49    pub fn new_with_limits(
50        stg: Box<dyn Storage>,
51        upd: Box<dyn Storage>,
52        lim: &Limits,
53    ) -> Box<Self> {
54        let size = stg.size();
55        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
56        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
57        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
58        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
59
60        // Start the thread which does save asyncronously.
61        let (cf1, busy1) = (cf.clone(), busy.clone());
62        std::thread::spawn(move || {
63            while let Ok((size, map)) = rx.recv() {
64                let _lock = busy1.lock();
65                baf.map = map;
66                baf.commit(size);
67                cf1.write().unwrap().done_one();
68            }
69        });
70        Box::new(Self {
71            map: WMap::default(),
72            cf,
73            size,
74            tx,
75            busy,
76            map_lim: lim.map_lim,
77        })
78    }
79}
80
81impl Storage for AtomicFile {
82    fn commit(&mut self, size: u64) {
83        self.size = size;
84        if self.map.is_empty() {
85            return;
86        }
87        if self.cf.read().unwrap().map.len() > self.map_lim {
88            self.wait_complete();
89        }
90        let map = std::mem::take(&mut self.map);
91        let cf = &mut *self.cf.write().unwrap();
92        cf.todo += 1;
93        map.to_storage(cf);
94        self.tx.send((size, map)).unwrap();
95    }
96
97    fn size(&self) -> u64 {
98        self.size
99    }
100
101    fn read(&self, start: u64, data: &mut [u8]) {
102        self.map.read(start, data, &*self.cf.read().unwrap());
103    }
104
105    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
106        self.map.write(start, data, off, len);
107    }
108
109    fn write(&mut self, start: u64, data: &[u8]) {
110        let len = data.len();
111        let d = Arc::new(data.to_vec());
112        self.write_data(start, d, 0, len);
113    }
114
115    fn wait_complete(&self) {
116        while self.cf.read().unwrap().todo != 0 {
117            #[cfg(feature = "log")]
118            println!("AtomicFile::wait_complete - waiting for writer process");
119            let _x = self.busy.lock();
120        }
121    }
122}
123
124/// Storage interface - Storage is some kind of "file" storage.
125///
126/// read and write methods take a start which is a byte offset in the underlying file.
127pub trait Storage: Send + Sync {
128    /// Get the size of the underlying storage.
129    /// Note : this is valid initially and after a commit but is not defined after write is called.
130    fn size(&self) -> u64;
131
132    /// Read data.
133    fn read(&self, start: u64, data: &mut [u8]);
134
135    /// Write byte slice to storage.
136    fn write(&mut self, start: u64, data: &[u8]);
137
138    /// Write byte Vec.
139    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
140        let len = data.len();
141        let d = Arc::new(data);
142        self.write_data(start, d, 0, len);
143    }
144
145    /// Write Data slice.
146    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
147        self.write(start, &data[off..off + len]);
148    }
149
150    /// Finish write transaction, size is new size of underlying storage.
151    fn commit(&mut self, size: u64);
152
153    /// Write u64.
154    fn write_u64(&mut self, start: u64, value: u64) {
155        self.write(start, &value.to_le_bytes());
156    }
157
158    /// Read u64.
159    fn read_u64(&self, start: u64) -> u64 {
160        let mut bytes = [0; 8];
161        self.read(start, &mut bytes);
162        u64::from_le_bytes(bytes)
163    }
164
165    /// Clone. note: provided method panics.
166    fn clone(&self) -> Box<dyn Storage> {
167        panic!()
168    }
169
170    /// Wait until current writes are complete.
171    fn wait_complete(&self) {}
172}
173
174/// Simple implementation of [Storage] using `Vec<u8>`.
175#[derive(Default)]
176pub struct MemFile {
177    v: Arc<Mutex<Vec<u8>>>,
178}
179
180impl MemFile {
181    /// Get a new (boxed) MemFile.
182    pub fn new() -> Box<Self> {
183        Box::<Self>::default()
184    }
185}
186
187impl Storage for MemFile {
188    fn size(&self) -> u64 {
189        let v = self.v.lock().unwrap();
190        v.len() as u64
191    }
192
193    fn read(&self, off: u64, bytes: &mut [u8]) {
194        let off = off as usize;
195        let len = bytes.len();
196        let mut v = self.v.lock().unwrap();
197        if off + len > v.len() {
198            v.resize(off + len, 0);
199        }
200        bytes.copy_from_slice(&v[off..off + len]);
201    }
202
203    fn write(&mut self, off: u64, bytes: &[u8]) {
204        let off = off as usize;
205        let len = bytes.len();
206        let mut v = self.v.lock().unwrap();
207        if off + len > v.len() {
208            v.resize(off + len, 0);
209        }
210        v[off..off + len].copy_from_slice(bytes);
211    }
212
213    fn commit(&mut self, size: u64) {
214        let mut v = self.v.lock().unwrap();
215        v.resize(size as usize, 0);
216    }
217
218    fn clone(&self) -> Box<dyn Storage> {
219        Box::new(Self { v: self.v.clone() })
220    }
221}
222
223use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
224
225/// Simple implementation of [Storage] using `std::fs::File`.
226pub struct SimpleFileStorage {
227    file: Arc<Mutex<fs::File>>,
228}
229
230impl SimpleFileStorage {
231    /// Construct from filename.
232    pub fn new(filename: &str) -> Box<Self> {
233        Box::new(Self {
234            file: Arc::new(Mutex::new(
235                OpenOptions::new()
236                    .read(true)
237                    .write(true)
238                    .create(true)
239                    .truncate(false)
240                    .open(filename)
241                    .unwrap(),
242            )),
243        })
244    }
245}
246
247impl Storage for SimpleFileStorage {
248    fn size(&self) -> u64 {
249        let mut f = self.file.lock().unwrap();
250        f.seek(SeekFrom::End(0)).unwrap()
251    }
252
253    fn read(&self, off: u64, bytes: &mut [u8]) {
254        let mut f = self.file.lock().unwrap();
255        f.seek(SeekFrom::Start(off)).unwrap();
256        let _ = f.read(bytes).unwrap();
257    }
258
259    fn write(&mut self, off: u64, bytes: &[u8]) {
260        let mut f = self.file.lock().unwrap();
261        // The list of operating systems which auto-zero is likely more than this...research is todo.
262        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
263        {
264            let size = f.seek(SeekFrom::End(0)).unwrap();
265            if off > size {
266                f.set_len(off).unwrap();
267                /*
268                let len = (off - size) as usize;
269                let zb = vec![0; len];
270                f.seek(SeekFrom::Start(size)).unwrap();
271                let _ = f.write(&zb).unwrap();
272                */
273            }
274        }
275        f.seek(SeekFrom::Start(off)).unwrap();
276        let _ = f.write(bytes).unwrap();
277    }
278
279    fn commit(&mut self, size: u64) {
280        let f = self.file.lock().unwrap();
281        f.set_len(size).unwrap();
282        f.sync_all().unwrap();
283    }
284
285    fn clone(&self) -> Box<dyn Storage> {
286        Box::new(Self {
287            file: self.file.clone(),
288        })
289    }
290}
291
292/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads/writes by different threads.
293#[allow(clippy::vec_box)]
294pub struct MultiFileStorage {
295    filename: String,
296    files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
297}
298
299impl MultiFileStorage {
300    /// Create new MultiFileStorage.
301    pub fn new(filename: &str) -> Box<Self> {
302        Box::new(Self {
303            filename: filename.to_string(),
304            files: Arc::new(Mutex::new(Vec::new())),
305        })
306    }
307
308    fn get_file(&self) -> Box<SimpleFileStorage> {
309        match self.files.lock().unwrap().pop() {
310            Some(f) => f,
311            _ => SimpleFileStorage::new(&self.filename),
312        }
313    }
314
315    fn put_file(&self, f: Box<SimpleFileStorage>) {
316        self.files.lock().unwrap().push(f);
317    }
318}
319
320impl Storage for MultiFileStorage {
321    fn size(&self) -> u64 {
322        let f = self.get_file();
323        let result = f.size();
324        self.put_file(f);
325        result
326    }
327
328    fn read(&self, off: u64, bytes: &mut [u8]) {
329        let f = self.get_file();
330        f.read(off, bytes);
331        self.put_file(f);
332    }
333
334    fn write(&mut self, off: u64, bytes: &[u8]) {
335        let mut f = self.get_file();
336        f.write(off, bytes);
337        self.put_file(f);
338    }
339
340    fn commit(&mut self, size: u64) {
341        let mut f = self.get_file();
342        f.commit(size);
343        self.put_file(f);
344    }
345
346    fn clone(&self) -> Box<dyn Storage> {
347        Box::new(Self {
348            filename: self.filename.clone(),
349            files: self.files.clone(),
350        })
351    }
352}
353
354/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
355pub struct DummyFile {}
356impl DummyFile {
357    /// Construct.
358    pub fn new() -> Box<Self> {
359        Box::new(Self {})
360    }
361}
362
363impl Storage for DummyFile {
364    fn size(&self) -> u64 {
365        0
366    }
367
368    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
369
370    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
371
372    fn commit(&mut self, _size: u64) {}
373
374    fn clone(&self) -> Box<dyn Storage> {
375        Self::new()
376    }
377}
378
379/// Memory configuration limits.
380#[non_exhaustive]
381pub struct Limits {
382    /// Limit on size of commit write map, default is 5000.
383    pub map_lim: usize,
384    /// Memory for buffering small reads, default is 0x200000.
385    pub rbuf_mem: usize,
386    /// Memory for buffering writes to main storage, default is 0x100000.
387    pub swbuf: usize,
388    /// Memory for buffering writes to temporary storage, default is 0x100000.
389    pub uwbuf: usize,
390}
391
392impl Default for Limits {
393    fn default() -> Self {
394        Self {
395            map_lim: 5000,
396            rbuf_mem: 0x200000,
397            swbuf: 0x100000,
398            uwbuf: 0x100000,
399        }
400    }
401}
402
403struct CommitFile {
404    stg: ReadBufStg<256>,
405    map: WMap,
406    todo: usize,
407}
408
409impl CommitFile {
410    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
411        Self {
412            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
413            map: WMap::default(),
414            todo: 0,
415        }
416    }
417
418    fn done_one(&mut self) {
419        self.todo -= 1;
420        if self.todo == 0 {
421            self.map = WMap::default();
422            self.stg.reset();
423        }
424    }
425}
426
427impl Storage for CommitFile {
428    fn commit(&mut self, _size: u64) {
429        panic!()
430    }
431
432    fn size(&self) -> u64 {
433        panic!()
434    }
435
436    fn read(&self, start: u64, data: &mut [u8]) {
437        self.map.read(start, data, &self.stg);
438    }
439
440    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
441        self.map.write(start, data, off, len);
442    }
443
444    fn write(&mut self, _start: u64, _data: &[u8]) {
445        panic!()
446    }
447}
448
449/// Write Buffer.
450struct WriteBuffer {
451    ix: usize,
452    pos: u64,
453    /// Underlying storage.
454    pub stg: Box<dyn Storage>,
455    buf: Vec<u8>,
456    #[cfg(feature = "log")]
457    log: Log,
458}
459
460#[cfg(feature = "log")]
461struct Log {
462    write: u64,
463    flush: u64,
464    total: u64,
465    first_flush_time: std::time::Instant,
466}
467
468impl WriteBuffer {
469    /// Construct.
470    pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
471        Self {
472            ix: 0,
473            pos: u64::MAX,
474            stg,
475            buf: vec![0; buf_size],
476            #[cfg(feature = "log")]
477            log: Log {
478                write: 0,
479                flush: 0,
480                total: 0,
481                first_flush_time: std::time::Instant::now(),
482            },
483        }
484    }
485
486    /// Write data to specified offset,
487    pub fn write(&mut self, off: u64, data: &[u8]) {
488        if self.pos + self.ix as u64 != off {
489            self.flush(off);
490        }
491        let mut done: usize = 0;
492        let mut todo: usize = data.len();
493        #[cfg(feature = "log")]
494        {
495            self.log.write += 1;
496            self.log.total += todo as u64;
497        }
498        while todo > 0 {
499            let mut n: usize = self.buf.len() - self.ix;
500            if n == 0 {
501                self.flush(off + done as u64);
502                n = self.buf.len();
503            }
504            if n > todo {
505                n = todo;
506            }
507            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
508            todo -= n;
509            done += n;
510            self.ix += n;
511        }
512    }
513
514    fn flush(&mut self, new_pos: u64) {
515        if self.ix > 0 {
516            self.stg.write(self.pos, &self.buf[0..self.ix]);
517            #[cfg(feature = "log")]
518            {
519                if self.log.flush == 0 {
520                    self.log.first_flush_time = std::time::Instant::now();
521                }
522                self.log.flush += 1;
523            }
524        }
525        self.ix = 0;
526        self.pos = new_pos;
527    }
528
529    /// Commit.
530    pub fn commit(&mut self, size: u64) {
531        self.flush(u64::MAX);
532        self.stg.commit(size);
533        #[cfg(feature = "log")]
534        {
535            if size > 0 {
536                println!(
537                    "WriteBuffer commit size={size} write={} flush={} total={} time(micros)={}",
538                    self.log.write,
539                    self.log.flush,
540                    self.log.total,
541                    self.log.first_flush_time.elapsed().as_micros()
542                );
543            }
544            self.log.write = 0;
545            self.log.flush = 0;
546            self.log.total = 0;
547        }
548    }
549
550    /// Write u64.
551    pub fn write_u64(&mut self, start: u64, value: u64) {
552        self.write(start, &value.to_le_bytes());
553    }
554}
555
556/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
557///
558/// See implementation of AtomicFile for how this is used in conjunction with WMap.
559///
560/// N is buffer size.
561struct ReadBufStg<const N: usize> {
562    stg: Box<dyn Storage>,
563    buf: Mutex<ReadBuffer<N>>,
564    limit: usize,
565}
566
567impl<const N: usize> Drop for ReadBufStg<N> {
568    fn drop(&mut self) {
569        self.reset();
570    }
571}
572
573impl<const N: usize> ReadBufStg<N> {
574    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
575    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
576        Self {
577            stg,
578            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
579            limit,
580        }
581    }
582
583    /// Clears the buffers.
584    fn reset(&mut self) {
585        self.buf.lock().unwrap().reset();
586    }
587}
588
589impl<const N: usize> Storage for ReadBufStg<N> {
590    /// Read data from storage.
591    fn read(&self, start: u64, data: &mut [u8]) {
592        if data.len() <= self.limit {
593            self.buf.lock().unwrap().read(&*self.stg, start, data);
594        } else {
595            self.stg.read(start, data);
596        }
597    }
598
599    /// Panics.
600    fn size(&self) -> u64 {
601        panic!()
602    }
603
604    /// Panics.
605    fn write(&mut self, _start: u64, _data: &[u8]) {
606        panic!();
607    }
608
609    /// Panics.
610    fn commit(&mut self, _size: u64) {
611        panic!();
612    }
613}
614
615struct ReadBuffer<const N: usize> {
616    map: HashMap<u64, Box<[u8; N]>>,
617    max_buf: usize,
618    reads: u64,
619}
620
621impl<const N: usize> ReadBuffer<N> {
622    fn new(max_buf: usize) -> Self {
623        Self {
624            map: HashMap::default(),
625            max_buf,
626            reads: 0,
627        }
628    }
629
630    fn reset(&mut self) {
631        #[cfg(feature = "log")]
632        println!(
633            "ReadBuffer reset entries={} reads={}",
634            self.map.len(),
635            self.reads
636        );
637        self.reads = 0;
638        self.map.clear();
639    }
640
641    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
642        let mut done = 0;
643        while done < data.len() {
644            let off = off + done as u64;
645            let sector = off / N as u64;
646            let disp = (off % N as u64) as usize;
647            let amount = min(data.len() - done, N - disp);
648
649            self.reads += 1;
650
651            let p = self.map.entry(sector).or_insert_with(|| {
652                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
653                stg.read(sector * N as u64, &mut *p);
654                p
655            });
656            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
657            done += amount;
658        }
659        if self.map.len() >= self.max_buf {
660            self.reset();
661        }
662    }
663}
664
665#[derive(Default)]
666/// Slice of Data to be written to storage.
667struct DataSlice {
668    /// Slice data.
669    pub data: Data,
670    /// Start of slice.
671    pub off: usize,
672    /// Length of slice.
673    pub len: usize,
674}
675
676impl DataSlice {
677    /// Get reference to the whole slice.
678    pub fn all(&self) -> &[u8] {
679        &self.data[self.off..self.off + self.len]
680    }
681    /// Get reference to part of slice.
682    pub fn part(&self, off: usize, len: usize) -> &[u8] {
683        &self.data[self.off + off..self.off + off + len]
684    }
685    /// Trim specified amount from start of slice.
686    pub fn trim(&mut self, trim: usize) {
687        self.off += trim;
688        self.len -= trim;
689    }
690    /// Take the data.
691    pub fn take(&mut self) -> Data {
692        std::mem::take(&mut self.data)
693    }
694}
695
696#[derive(Default)]
697/// Updateable storage based on some underlying storage.
698struct WMap {
699    /// Map of writes. Key is the end of the slice.
700    map: BTreeMap<u64, DataSlice>,
701}
702
703impl WMap {
704    /// Is the map empty?
705    pub fn is_empty(&self) -> bool {
706        self.map.is_empty()
707    }
708
709    /// Number of key-value pairs in the map.
710    pub fn len(&self) -> usize {
711        self.map.len()
712    }
713
714    /// Take the map and convert it to a Vec.
715    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
716        let map = std::mem::take(&mut self.map);
717        let mut result = Vec::with_capacity(map.len());
718        for (end, v) in map {
719            let start = end - v.len as u64;
720            result.push((start, v));
721        }
722        result
723    }
724
725    /// Write the map into storage.
726    pub fn to_storage(&self, stg: &mut dyn Storage) {
727        for (end, v) in self.map.iter() {
728            let start = end - v.len as u64;
729            stg.write_data(start, v.data.clone(), v.off, v.len);
730        }
731    }
732
733    #[cfg(not(feature = "pstd"))]
734    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
735    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
736        if len != 0 {
737            let (mut insert, mut remove) = (Vec::new(), Vec::new());
738            let end = start + len as u64;
739            for (ee, v) in self.map.range_mut(start + 1..) {
740                let ee = *ee;
741                let es = ee - v.len as u64; // Existing write Start.
742                if es >= end {
743                    // Existing write starts after end of new write, nothing to do.
744                    break;
745                } else if start <= es {
746                    if end < ee {
747                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
748                        v.trim((end - es) as usize);
749                        break;
750                    }
751                    // New write subsumes existing write entirely, remove existing write.
752                    remove.push(ee);
753                } else if end < ee {
754                    // New write starts in middle of existing write, ends before end of existing write,
755                    // put start of existing write in insert list, trim existing write.
756                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
757                    v.trim((end - es) as usize);
758                    break;
759                } else {
760                    // New write starts in middle of existing write, ends after existing write,
761                    // put start of existing write in insert list, remove existing write.
762                    insert.push((es, v.take(), v.off, (start - es) as usize));
763                    remove.push(ee);
764                }
765            }
766            for end in remove {
767                self.map.remove(&end);
768            }
769            for (start, data, off, len) in insert {
770                self.map
771                    .insert(start + len as u64, DataSlice { data, off, len });
772            }
773            self.map
774                .insert(start + len as u64, DataSlice { data, off, len });
775        }
776    }
777
778    #[cfg(feature = "pstd")]
779    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
780    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
781        if len != 0 {
782            let end = start + len as u64;
783            let mut c = self
784                .map
785                .lower_bound_mut(std::ops::Bound::Excluded(&start))
786                .with_mutable_key();
787            while let Some((eend, v)) = c.next() {
788                let ee = *eend;
789                let es = ee - v.len as u64; // Existing write Start.
790                if es >= end {
791                    // Existing write starts after end of new write, nothing to do.
792                    c.prev();
793                    break;
794                } else if start <= es {
795                    if end < ee {
796                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
797                        v.trim((end - es) as usize);
798                        c.prev();
799                        break;
800                    }
801                    // New write subsumes existing write entirely, remove existing write.
802                    c.remove_prev();
803                } else if end < ee {
804                    // New write starts in middle of existing write, ends before end of existing write,
805                    // trim existing write, insert start of existing write.
806                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
807                    v.trim((end - es) as usize);
808                    c.prev();
809                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
810                    break;
811                } else {
812                    // New write starts in middle of existing write, ends after existing write,
813                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
814                    v.len = (start - es) as usize;
815                    *eend = es + v.len as u64;
816                }
817            }
818            // Insert the new write.
819            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
820        }
821    }
822
823    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
824    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
825        let len = data.len();
826        if len != 0 {
827            let mut done = 0;
828            for (&end, v) in self.map.range(start + 1..) {
829                let es = end - v.len as u64; // Existing write Start.
830                let doff = start + done as u64;
831                if es > doff {
832                    // Read from underlying storage.
833                    let a = min(len - done, (es - doff) as usize);
834                    u.read(doff, &mut data[done..done + a]);
835                    done += a;
836                    if done == len {
837                        return;
838                    }
839                }
840                // Use existing write.
841                let skip = (start + done as u64 - es) as usize;
842                let a = min(len - done, v.len - skip);
843                data[done..done + a].copy_from_slice(v.part(skip, a));
844                done += a;
845                if done == len {
846                    return;
847                }
848            }
849            u.read(start + done as u64, &mut data[done..]);
850        }
851    }
852}
853
854/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of reads and writes.
855pub struct BasicAtomicFile {
856    /// The main underlying storage.
857    stg: WriteBuffer,
858    /// Temporary storage for updates during commit.
859    upd: WriteBuffer,
860    /// Map of writes.
861    map: WMap,
862    /// List of writes.
863    list: Vec<(u64, DataSlice)>,
864    size: u64,
865}
866
867impl BasicAtomicFile {
868    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
869    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
870        let size = stg.size();
871        let mut result = Box::new(Self {
872            stg: WriteBuffer::new(stg, lim.swbuf),
873            upd: WriteBuffer::new(upd, lim.uwbuf),
874            map: WMap::default(),
875            list: Vec::new(),
876            size,
877        });
878        result.init();
879        result
880    }
881
882    /// Apply outstanding updates.
883    fn init(&mut self) {
884        let end = self.upd.stg.read_u64(0);
885        let size = self.upd.stg.read_u64(8);
886        if end == 0 {
887            return;
888        }
889        assert!(end == self.upd.stg.size());
890        let mut pos = 16;
891        while pos < end {
892            let start = self.upd.stg.read_u64(pos);
893            pos += 8;
894            let len = self.upd.stg.read_u64(pos);
895            pos += 8;
896            let mut buf = vec![0; len as usize];
897            self.upd.stg.read(pos, &mut buf);
898            pos += len;
899            self.stg.write(start, &buf);
900        }
901        self.stg.commit(size);
902        self.upd.commit(0);
903    }
904
905    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
906    pub fn commit_phase(&mut self, size: u64, phase: u8) {
907        if self.map.is_empty() && self.list.is_empty() {
908            return;
909        }
910        if phase == 1 {
911            self.list = self.map.convert_to_vec();
912
913            // Write the updates to upd.
914            // First set the end position to zero.
915            self.upd.write_u64(0, 0);
916            self.upd.write_u64(8, size);
917            self.upd.commit(16); // Not clear if this is necessary.
918
919            // Write the update records.
920            let mut stg_written = false;
921            let mut pos: u64 = 16;
922            for (start, v) in self.list.iter() {
923                let (start, len, data) = (*start, v.len as u64, v.all());
924                if start >= self.size {
925                    // Writes beyond current stg size can be written directly.
926                    stg_written = true;
927                    self.stg.write(start, data);
928                } else {
929                    self.upd.write_u64(pos, start);
930                    pos += 8;
931                    self.upd.write_u64(pos, len);
932                    pos += 8;
933                    self.upd.write(pos, data);
934                    pos += len;
935                }
936            }
937            if stg_written {
938                self.stg.commit(size);
939            }
940            self.upd.commit(pos); // Not clear if this is necessary.
941
942            // Set the end position.
943            self.upd.write_u64(0, pos);
944            self.upd.write_u64(8, size);
945            self.upd.commit(pos);
946        } else {
947            for (start, v) in self.list.iter() {
948                if *start < self.size {
949                    // Writes beyond current stg size have already been written.
950                    self.stg.write(*start, v.all());
951                }
952            }
953            self.list.clear();
954            self.stg.commit(size);
955            self.upd.commit(0);
956        }
957    }
958}
959
960impl Storage for BasicAtomicFile {
961    fn commit(&mut self, size: u64) {
962        self.commit_phase(size, 1);
963        self.commit_phase(size, 2);
964        self.size = size;
965    }
966
967    fn size(&self) -> u64 {
968        self.size
969    }
970
971    fn read(&self, start: u64, data: &mut [u8]) {
972        self.map.read(start, data, &*self.stg.stg);
973    }
974
975    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
976        self.map.write(start, data, off, len);
977    }
978
979    fn write(&mut self, start: u64, data: &[u8]) {
980        let len = data.len();
981        let d = Arc::new(data.to_vec());
982        self.write_data(start, d, 0, len);
983    }
984}
985
986#[cfg(test)]
987/// Get amount of testing from environment variable TA.
988fn test_amount() -> usize {
989    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
990}
991
992#[test]
993fn test_atomic_file() {
994    use rand::Rng;
995    /* Idea of test is to check AtomicFile and MemFile behave the same */
996
997    let ta = test_amount();
998    println!(" Test amount={}", ta);
999
1000    let mut rng = rand::thread_rng();
1001
1002    for _ in 0..100 {
1003        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1004        let mut s2 = MemFile::new();
1005
1006        for _ in 0..1000 * ta {
1007            let off: usize = rng.r#gen::<usize>() % 100;
1008            let mut len = 1 + rng.r#gen::<usize>() % 20;
1009            let w: bool = rng.r#gen();
1010            if w {
1011                let mut bytes = Vec::new();
1012                while len > 0 {
1013                    len -= 1;
1014                    let b: u8 = rng.r#gen::<u8>();
1015                    bytes.push(b);
1016                }
1017                s1.write(off as u64, &bytes);
1018                s2.write(off as u64, &bytes);
1019            } else {
1020                let mut b2 = vec![0; len];
1021                let mut b3 = vec![0; len];
1022                s1.read(off as u64, &mut b2);
1023                s2.read(off as u64, &mut b3);
1024                assert!(b2 == b3);
1025            }
1026        }
1027        s1.commit(200);
1028        s2.commit(200);
1029    }
1030}