Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cmp::min;
9use std::sync::{Arc, Mutex, RwLock};
10use std::cell::RefCell;
11
12#[cfg(feature = "pstd")]
13use pstd::collections::BTreeMap;
14#[cfg(not(feature = "pstd"))]
15use std::collections::BTreeMap;
16
17/// ```Arc<Vec<u8>>```
18pub type Data = Arc<Vec<u8>>;
19
20/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
21/// Performs commit asyncronously.
22///
23/// #Example
24///
25/// ```
26/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
27/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
28/// af.write( 0, &[1,2,3,4] );
29/// af.commit(4);
30/// af.wait_complete();
31/// ```
32///
33/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
34/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
35/// the updates to underlying storage have been applied.
36pub struct AtomicFile {
37    /// New updates are written here.
38    map: WMap,
39    /// Underlying file, with previous updates mapped.
40    cf: Arc<RwLock<CommitFile>>,
41    /// File size.
42    size: u64,
43    /// For sending update maps to be saved.
44    tx: std::sync::mpsc::Sender<(u64, WMap)>,
45    /// Held by update process while it is active.
46    busy: Arc<Mutex<()>>,
47    /// Limit on size of CommitFile map.
48    map_lim: usize,
49}
50
51impl AtomicFile {
52    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
53    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
54        Self::new_with_limits(stg, upd, &Limits::default())
55    }
56
57    /// Construct Atomic file with specified limits.
58    pub fn new_with_limits(
59        stg: Box<dyn Storage>,
60        upd: Box<dyn BasicStorage>,
61        lim: &Limits,
62    ) -> Box<Self> {
63        let size = stg.size();
64        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
65
66        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
67        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
68        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
69
70        // Start the thread which does save asyncronously.
71        let (cf1, busy1) = (cf.clone(), busy.clone());
72
73        std::thread::spawn(move || {
74            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
75            while let Ok((size, map)) = rx.recv() {
76                let _lock = busy1.lock();
77                baf.map = map;
78                baf.commit(size);
79                cf1.write().unwrap().done_one();
80            }
81        });
82        Box::new(Self {
83            map: WMap::default(),
84            cf,
85            size,
86            tx,
87            busy,
88            map_lim: lim.map_lim,
89        })
90    }
91}
92
93impl Storage for AtomicFile {}
94
95impl BasicStorage for AtomicFile {
96    fn commit(&mut self, size: u64) {
97        self.size = size;
98        if self.map.is_empty() {
99            return;
100        }
101        if self.cf.read().unwrap().map.len() > self.map_lim {
102            self.wait_complete();
103        }
104        let map = std::mem::take(&mut self.map);
105        let cf = &mut *self.cf.write().unwrap();
106        cf.todo += 1;
107        // Apply map of updates to CommitFile.
108        map.to_storage(cf);
109        // Send map of updates to thread to be written to underlying storage.
110        self.tx.send((size, map)).unwrap();
111    }
112
113    fn size(&self) -> u64 {
114        self.size
115    }
116
117    fn read(&self, start: u64, data: &mut [u8]) {
118        self.map.read(start, data, &*self.cf.read().unwrap());
119    }
120
121    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
122        self.map.write(start, data, off, len);
123    }
124
125    fn write(&mut self, start: u64, data: &[u8]) {
126        let len = data.len();
127        let d = Arc::new(data.to_vec());
128        self.write_data(start, d, 0, len);
129    }
130
131    fn wait_complete(&self) {
132        while self.cf.read().unwrap().todo != 0 {
133            let _x = self.busy.lock();
134        }
135    }
136}
137
138struct CommitFile {
139    /// Buffered underlying storage.
140    stg: ReadBufStg<256>,
141    /// Map of committed updates.
142    map: WMap,
143    /// Number of outstanding unsaved commits.
144    todo: usize,
145}
146
147impl CommitFile {
148    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
149        Self {
150            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
151            map: WMap::default(),
152            todo: 0,
153        }
154    }
155
156    fn done_one(&mut self) {
157        self.todo -= 1;
158        if self.todo == 0 {
159            self.map = WMap::default();
160            self.stg.reset();
161        }
162    }
163}
164
165impl Storage for CommitFile {}
166
167impl BasicStorage for CommitFile {
168    fn commit(&mut self, _size: u64) {
169        panic!()
170    }
171
172    fn size(&self) -> u64 {
173        panic!()
174    }
175
176    fn read(&self, start: u64, data: &mut [u8]) {
177        self.map.read(start, data, &self.stg);
178    }
179
180    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
181        self.map.write(start, data, off, len);
182    }
183
184    fn write(&mut self, _start: u64, _data: &[u8]) {
185        panic!()
186    }
187}
188
189/// Storage interface - BasicStorage is some kind of "file" storage.
190///
191/// read and write methods take a start which is a byte offset in the underlying file.
192pub trait BasicStorage: Send {
193    /// Get the size of the underlying storage.
194    /// Note : this is valid initially and after a commit but is not defined after write is called.
195    fn size(&self) -> u64;
196
197    /// Read data.
198    fn read(&self, start: u64, data: &mut [u8]);
199
200    /// Write byte slice to storage.
201    fn write(&mut self, start: u64, data: &[u8]);
202
203    /// Write byte Vec.
204    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
205        let len = data.len();
206        let d = Arc::new(data);
207        self.write_data(start, d, 0, len);
208    }
209
210    /// Write Data slice.
211    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
212        self.write(start, &data[off..off + len]);
213    }
214
215    /// Finish write transaction, size is new size of underlying storage.
216    fn commit(&mut self, size: u64);
217
218    /// Write u64.
219    fn write_u64(&mut self, start: u64, value: u64) {
220        self.write(start, &value.to_le_bytes());
221    }
222
223    /// Read u64.
224    fn read_u64(&self, start: u64) -> u64 {
225        let mut bytes = [0; 8];
226        self.read(start, &mut bytes);
227        u64::from_le_bytes(bytes)
228    }
229
230    /// Clone. note: provided method panics.
231    fn clone(&self) -> Box<dyn Storage + Send + Sync> {
232        panic!()
233    }
234
235    /// Wait until current writes are complete.
236    fn wait_complete(&self) {}
237}
238
239/// BasicStorage that can be shared between threads.
240pub trait Storage: BasicStorage + Sync {}
241
242/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
243#[derive(Default)]
244pub struct MemFile {
245    v: Arc<Mutex<Vec<u8>>>,
246}
247
248impl MemFile {
249    /// Get a new (boxed) MemFile.
250    pub fn new() -> Box<Self> {
251        Box::default()
252    }
253}
254
255impl Storage for MemFile {}
256
257impl BasicStorage for MemFile {
258    fn size(&self) -> u64 {
259        let v = self.v.lock().unwrap();
260        v.len() as u64
261    }
262
263    fn read(&self, off: u64, bytes: &mut [u8]) {
264        let off = off as usize;
265        let len = bytes.len();
266        let mut v = self.v.lock().unwrap();
267        if off + len > v.len() {
268            v.resize(off + len, 0);
269        }
270        bytes.copy_from_slice(&v[off..off + len]);
271    }
272
273    fn write(&mut self, off: u64, bytes: &[u8]) {
274        let off = off as usize;
275        let len = bytes.len();
276        let mut v = self.v.lock().unwrap();
277        if off + len > v.len() {
278            v.resize(off + len, 0);
279        }
280        v[off..off + len].copy_from_slice(bytes);
281    }
282
283    fn commit(&mut self, size: u64) {
284        let mut v = self.v.lock().unwrap();
285        v.resize(size as usize, 0);
286    }
287
288    fn clone(&self) -> Box<dyn Storage + Send + Sync> {
289        Box::new(Self { v: self.v.clone() })
290    }
291}
292
293use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
294
295struct FileInner {
296    f: fs::File,
297}
298
299impl FileInner {
300    /// Construct from filename.
301    pub fn new(filename: &str) -> Self {
302        Self {
303            f: OpenOptions::new()
304                .read(true)
305                .write(true)
306                .create(true)
307                .truncate(false)
308                .open(filename)
309                .unwrap(),
310        }
311    }
312
313    fn size(&mut self) -> u64 {
314        self.f.seek(SeekFrom::End(0)).unwrap()
315    }
316
317    fn read(&mut self, off: u64, bytes: &mut [u8]) {
318        self.f.seek(SeekFrom::Start(off)).unwrap();
319        let _ = self.f.read(bytes).unwrap();
320    }
321
322    fn write(&mut self, off: u64, bytes: &[u8]) {
323        // The list of operating systems which auto-zero is likely more than this...research is todo.
324        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
325        {
326            let size = self.f.seek(SeekFrom::End(0)).unwrap();
327            if off > size {
328                self.f.set_len(off).unwrap();
329            }
330        }
331        self.f.seek(SeekFrom::Start(off)).unwrap();
332        let _ = self.f.write(bytes).unwrap();
333    }
334
335    fn commit(&mut self, size: u64) {
336        self.f.set_len(size).unwrap();
337        self.f.sync_all().unwrap();
338    }
339}
340
341/// Can be used for atomic upd file ( does not implement Sync ).
342pub struct FastFileStorage {
343    file: RefCell<FileInner>,
344}
345
346impl FastFileStorage {
347    /// Construct from filename.
348    pub fn new(filename: &str) -> Box<Self> {
349        Box::new(Self {
350            file: RefCell::new(FileInner::new(filename)),
351        })
352    }
353}
354
355impl BasicStorage for FastFileStorage {
356    fn size(&self) -> u64 {
357        self.file.borrow_mut().size()
358    }
359    fn read(&self, off: u64, bytes: &mut [u8]) {
360        self.file.borrow_mut().read(off, bytes);
361    }
362
363    fn write(&mut self, off: u64, bytes: &[u8]) {
364        self.file.borrow_mut().write(off, bytes);
365    }
366
367    fn commit(&mut self, size: u64) {
368        self.file.borrow_mut().commit(size);
369    }
370}
371
372/// Simple implementation of [Storage] using [`std::fs::File`].
373pub struct SimpleFileStorage {
374    file: Arc<Mutex<FileInner>>,
375}
376
377impl SimpleFileStorage {
378    /// Construct from filename.
379    pub fn new(filename: &str) -> Box<Self> {
380        Box::new(Self {
381            file: Arc::new(Mutex::new(FileInner::new(filename))),
382        })
383    }
384}
385
386impl Storage for SimpleFileStorage {}
387
388impl BasicStorage for SimpleFileStorage {
389    fn size(&self) -> u64 {
390        self.file.lock().unwrap().size()
391    }
392
393    fn read(&self, off: u64, bytes: &mut [u8]) {
394        self.file.lock().unwrap().read(off, bytes);
395    }
396
397    fn write(&mut self, off: u64, bytes: &[u8]) {
398        self.file.lock().unwrap().write(off, bytes);
399    }
400
401    fn commit(&mut self, size: u64) {
402        self.file.lock().unwrap().commit(size);
403    }
404
405    fn clone(&self) -> Box<dyn Storage + Send + Sync> {
406        Box::new(Self {
407            file: self.file.clone(),
408        })
409    }
410}
411
412/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
413#[allow(clippy::vec_box)]
414pub struct MultiFileStorage {
415    filename: String,
416    files: Arc<Mutex<Vec<Box<FileInner>>>>,
417}
418
419impl MultiFileStorage {
420    /// Create new MultiFileStorage.
421    pub fn new(filename: &str) -> Box<Self> {
422        Box::new(Self {
423            filename: filename.to_owned(),
424            files: Arc::new(Mutex::new(Vec::new())),
425        })
426    }
427
428    fn get_file(&self) -> Box<FileInner> {
429        match self.files.lock().unwrap().pop() {
430            Some(f) => f,
431            _ => Box::new(FileInner::new(&self.filename)),
432        }
433    }
434
435    fn put_file(&self, f: Box<FileInner>) {
436        self.files.lock().unwrap().push(f);
437    }
438}
439
440impl Storage for MultiFileStorage {}
441
442impl BasicStorage for MultiFileStorage {
443    fn size(&self) -> u64 {
444        let mut f = self.get_file();
445        let result = f.size();
446        self.put_file(f);
447        result
448    }
449
450    fn read(&self, off: u64, bytes: &mut [u8]) {
451        let mut f = self.get_file();
452        f.read(off, bytes);
453        self.put_file(f);
454    }
455
456    fn write(&mut self, off: u64, bytes: &[u8]) {
457        let mut f = self.get_file();
458        f.write(off, bytes);
459        self.put_file(f);
460    }
461
462    fn commit(&mut self, size: u64) {
463        let mut f = self.get_file();
464        f.commit(size);
465        self.put_file(f);
466    }
467
468    fn clone(&self) -> Box<dyn Storage + Send + Sync> {
469        Box::new(Self {
470            filename: self.filename.clone(),
471            files: self.files.clone(),
472        })
473    }
474}
475
476/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
477pub struct DummyFile {}
478impl DummyFile {
479    /// Construct.
480    pub fn new() -> Box<Self> {
481        Box::new(Self {})
482    }
483}
484
485impl Storage for DummyFile {}
486
487impl BasicStorage for DummyFile {
488    fn size(&self) -> u64 {
489        0
490    }
491
492    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
493
494    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
495
496    fn commit(&mut self, _size: u64) {}
497
498    fn clone(&self) -> Box<dyn Storage + Send + Sync> {
499        Self::new()
500    }
501}
502
503/// Memory configuration limits.
504#[non_exhaustive]
505pub struct Limits {
506    /// Limit on size of commit write map, default is 5000.
507    pub map_lim: usize,
508    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
509    pub rbuf_mem: usize,
510    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
511    pub swbuf: usize,
512    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
513    pub uwbuf: usize,
514}
515
516impl Default for Limits {
517    fn default() -> Self {
518        Self {
519            map_lim: 5000,
520            rbuf_mem: 0x200000,
521            swbuf: 0x100000,
522            uwbuf: 0x100000,
523        }
524    }
525}
526
527/// Write Buffer.
528struct WriteBuffer {
529    /// Current write index into buf.
530    ix: usize,
531    /// Current file position.
532    pos: u64,
533    /// Underlying storage.
534    pub stg: Box<dyn BasicStorage>,
535    /// Buffer.
536    buf: Vec<u8>,
537}
538
539impl WriteBuffer {
540    /// Construct.
541    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
542        Self {
543            ix: 0,
544            pos: u64::MAX,
545            stg,
546            buf: vec![0; buf_size],
547        }
548    }
549
550    /// Write data to specified offset,
551    pub fn write(&mut self, off: u64, data: &[u8]) {
552        if self.pos + self.ix as u64 != off {
553            self.flush(off);
554        }
555        let mut done: usize = 0;
556        let mut todo: usize = data.len();
557        while todo > 0 {
558            let mut n: usize = self.buf.len() - self.ix;
559            if n == 0 {
560                self.flush(off + done as u64);
561                n = self.buf.len();
562            }
563            if n > todo {
564                n = todo;
565            }
566            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
567            todo -= n;
568            done += n;
569            self.ix += n;
570        }
571    }
572
573    fn flush(&mut self, new_pos: u64) {
574        if self.ix > 0 {
575            self.stg.write(self.pos, &self.buf[0..self.ix]);
576        }
577        self.ix = 0;
578        self.pos = new_pos;
579    }
580
581    /// Commit.
582    pub fn commit(&mut self, size: u64) {
583        self.flush(u64::MAX);
584        self.stg.commit(size);
585    }
586
587    /// Write u64.
588    pub fn write_u64(&mut self, start: u64, value: u64) {
589        self.write(start, &value.to_le_bytes());
590    }
591}
592
593/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
594///
595/// See implementation of AtomicFile for how this is used in conjunction with WMap.
596///
597/// N is buffer size.
598struct ReadBufStg<const N: usize> {
599    /// Underlying storage.
600    stg: Box<dyn Storage>,
601    /// Buffers.
602    buf: Mutex<ReadBuffer<N>>,
603    /// Read size that is considered small.
604    limit: usize,
605}
606
607impl<const N: usize> Drop for ReadBufStg<N> {
608    fn drop(&mut self) {
609        self.reset();
610    }
611}
612
613impl<const N: usize> ReadBufStg<N> {
614    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
615    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
616        Self {
617            stg,
618            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
619            limit,
620        }
621    }
622
623    /// Clears the buffers.
624    fn reset(&mut self) {
625        self.buf.lock().unwrap().reset();
626    }
627}
628
629impl<const N: usize> Storage for ReadBufStg<N> {}
630
631impl<const N: usize> BasicStorage for ReadBufStg<N> {
632    /// Read data from storage.
633    fn read(&self, start: u64, data: &mut [u8]) {
634        if data.len() <= self.limit {
635            self.buf.lock().unwrap().read(&*self.stg, start, data);
636        } else {
637            self.stg.read(start, data);
638        }
639    }
640
641    /// Panics.
642    fn size(&self) -> u64 {
643        panic!()
644    }
645
646    /// Panics.
647    fn write(&mut self, _start: u64, _data: &[u8]) {
648        panic!();
649    }
650
651    /// Panics.
652    fn commit(&mut self, _size: u64) {
653        panic!();
654    }
655}
656
657struct ReadBuffer<const N: usize> {
658    /// Maps sector mumbers cached buffers.
659    map: HashMap<u64, Box<[u8; N]>>,
660    /// Maximum number of buffers.
661    max_buf: usize,
662}
663
664impl<const N: usize> ReadBuffer<N> {
665    fn new(max_buf: usize) -> Self {
666        Self {
667            map: HashMap::default(),
668            max_buf,
669        }
670    }
671
672    fn reset(&mut self) {
673        self.map.clear();
674    }
675
676    fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
677        let mut done = 0;
678        while done < data.len() {
679            let off = off + done as u64;
680            let sector = off / N as u64;
681            let disp = (off % N as u64) as usize;
682            let amount = min(data.len() - done, N - disp);
683
684            let p = self.map.entry(sector).or_insert_with(|| {
685                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
686                stg.read(sector * N as u64, &mut *p);
687                p
688            });
689            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
690            done += amount;
691        }
692        if self.map.len() >= self.max_buf {
693            self.reset();
694        }
695    }
696}
697
698#[derive(Default)]
699/// Slice of Data to be written to storage.
700struct DataSlice {
701    /// Slice data.
702    pub data: Data,
703    /// Start of slice.
704    pub off: usize,
705    /// Length of slice.
706    pub len: usize,
707}
708
709impl DataSlice {
710    /// Get reference to the whole slice.
711    pub fn all(&self) -> &[u8] {
712        &self.data[self.off..self.off + self.len]
713    }
714    /// Get reference to part of slice.
715    pub fn part(&self, off: usize, len: usize) -> &[u8] {
716        &self.data[self.off + off..self.off + off + len]
717    }
718    /// Trim specified amount from start of slice.
719    pub fn trim(&mut self, trim: usize) {
720        self.off += trim;
721        self.len -= trim;
722    }
723    /// Take the data.
724    #[allow(dead_code)]
725    pub fn take(&mut self) -> Data {
726        std::mem::take(&mut self.data)
727    }
728}
729
730#[derive(Default)]
731/// Updateable store based on some underlying storage.
732struct WMap {
733    /// Map of writes. Key is the end of the slice.
734    map: BTreeMap<u64, DataSlice>,
735}
736
737impl WMap {
738    /// Is the map empty?
739    pub fn is_empty(&self) -> bool {
740        self.map.is_empty()
741    }
742
743    /// Number of key-value pairs in the map.
744    pub fn len(&self) -> usize {
745        self.map.len()
746    }
747
748    /// Take the map and convert it to a Vec.
749    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
750        let map = std::mem::take(&mut self.map);
751        let mut result = Vec::with_capacity(map.len());
752        for (end, v) in map {
753            let start = end - v.len as u64;
754            result.push((start, v));
755        }
756        result
757    }
758
759    /// Write the map into storage.
760    pub fn to_storage(&self, stg: &mut dyn Storage) {
761        for (end, v) in self.map.iter() {
762            let start = end - v.len as u64;
763            stg.write_data(start, v.data.clone(), v.off, v.len);
764        }
765    }
766
767    #[cfg(not(feature = "pstd"))]
768    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
769    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
770        if len != 0 {
771            let (mut insert, mut remove) = (Vec::new(), Vec::new());
772            let end = start + len as u64;
773            for (ee, v) in self.map.range_mut(start + 1..) {
774                let ee = *ee;
775                let es = ee - v.len as u64; // Existing write Start.
776                if es >= end {
777                    // Existing write starts after end of new write, nothing to do.
778                    break;
779                } else if start <= es {
780                    if end < ee {
781                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
782                        v.trim((end - es) as usize);
783                        break;
784                    }
785                    // New write subsumes existing write entirely, remove existing write.
786                    remove.push(ee);
787                } else if end < ee {
788                    // New write starts in middle of existing write, ends before end of existing write,
789                    // put start of existing write in insert list, trim existing write.
790                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
791                    v.trim((end - es) as usize);
792                    break;
793                } else {
794                    // New write starts in middle of existing write, ends after existing write,
795                    // put start of existing write in insert list, remove existing write.
796                    insert.push((es, v.take(), v.off, (start - es) as usize));
797                    remove.push(ee);
798                }
799            }
800            for end in remove {
801                self.map.remove(&end);
802            }
803            for (start, data, off, len) in insert {
804                self.map
805                    .insert(start + len as u64, DataSlice { data, off, len });
806            }
807            self.map
808                .insert(start + len as u64, DataSlice { data, off, len });
809        }
810    }
811
812    #[cfg(feature = "pstd")]
813    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
814    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
815        if len != 0 {
816            let end = start + len as u64;
817            let mut c = self
818                .map
819                .lower_bound_mut(std::ops::Bound::Excluded(&start))
820                .with_mutable_key();
821            while let Some((eend, v)) = c.next() {
822                let ee = *eend;
823                let es = ee - v.len as u64; // Existing write Start.
824                if es >= end {
825                    // Existing write starts after end of new write, nothing to do.
826                    c.prev();
827                    break;
828                } else if start <= es {
829                    if end < ee {
830                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
831                        v.trim((end - es) as usize);
832                        c.prev();
833                        break;
834                    }
835                    // New write subsumes existing write entirely, remove existing write.
836                    c.remove_prev();
837                } else if end < ee {
838                    // New write starts in middle of existing write, ends before end of existing write,
839                    // trim existing write, insert start of existing write.
840                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
841                    v.trim((end - es) as usize);
842                    c.prev();
843                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
844                    break;
845                } else {
846                    // New write starts in middle of existing write, ends after existing write,
847                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
848                    v.len = (start - es) as usize;
849                    *eend = es + v.len as u64;
850                }
851            }
852            // Insert the new write.
853            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
854        }
855    }
856
857    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
858    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
859        let len = data.len();
860        if len != 0 {
861            let mut done = 0;
862            for (&end, v) in self.map.range(start + 1..) {
863                let es = end - v.len as u64; // Existing write Start.
864                let doff = start + done as u64;
865                if es > doff {
866                    // Read from underlying storage.
867                    let a = min(len - done, (es - doff) as usize);
868                    u.read(doff, &mut data[done..done + a]);
869                    done += a;
870                    if done == len {
871                        return;
872                    }
873                }
874                // Use existing write.
875                let skip = (start + done as u64 - es) as usize;
876                let a = min(len - done, v.len - skip);
877                data[done..done + a].copy_from_slice(v.part(skip, a));
878                done += a;
879                if done == len {
880                    return;
881                }
882            }
883            u.read(start + done as u64, &mut data[done..]);
884        }
885    }
886}
887
888/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
889pub struct BasicAtomicFile {
890    /// The main underlying storage.
891    stg: WriteBuffer,
892    /// Temporary storage for updates during commit.
893    upd: WriteBuffer,
894    /// Map of writes.
895    map: WMap,
896    /// List of writes.
897    list: Vec<(u64, DataSlice)>,
898    size: u64,
899}
900
901impl BasicAtomicFile {
902    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
903    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
904        let size = stg.size();
905        let mut result = Box::new(Self {
906            stg: WriteBuffer::new(stg, lim.swbuf),
907            upd: WriteBuffer::new(upd, lim.uwbuf),
908            map: WMap::default(),
909            list: Vec::new(),
910            size,
911        });
912        result.init();
913        result
914    }
915
916    /// Apply outstanding updates.
917    fn init(&mut self) {
918        let end = self.upd.stg.read_u64(0);
919        let size = self.upd.stg.read_u64(8);
920        if end == 0 {
921            return;
922        }
923        assert!(end == self.upd.stg.size());
924        let mut pos = 16;
925        while pos < end {
926            let start = self.upd.stg.read_u64(pos);
927            pos += 8;
928            let len = self.upd.stg.read_u64(pos);
929            pos += 8;
930            let mut buf = vec![0; len as usize];
931            self.upd.stg.read(pos, &mut buf);
932            pos += len;
933            self.stg.write(start, &buf);
934        }
935        self.stg.commit(size);
936        self.upd.commit(0);
937    }
938
939    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
940    pub fn commit_phase(&mut self, size: u64, phase: u8) {
941        if self.map.is_empty() && self.list.is_empty() {
942            return;
943        }
944        if phase == 1 {
945            self.list = self.map.convert_to_vec();
946
947            // Write the updates to upd.
948            // First set the end position to zero.
949            self.upd.write_u64(0, 0);
950            self.upd.write_u64(8, size);
951            self.upd.commit(16); // Not clear if this is necessary.
952
953            // Write the update records.
954            let mut stg_written = false;
955            let mut pos: u64 = 16;
956            for (start, v) in self.list.iter() {
957                let (start, len, data) = (*start, v.len as u64, v.all());
958                if start >= self.size {
959                    // Writes beyond current stg size can be written directly.
960                    stg_written = true;
961                    self.stg.write(start, data);
962                } else {
963                    self.upd.write_u64(pos, start);
964                    pos += 8;
965                    self.upd.write_u64(pos, len);
966                    pos += 8;
967                    self.upd.write(pos, data);
968                    pos += len;
969                }
970            }
971            if stg_written {
972                self.stg.commit(size);
973            }
974            self.upd.commit(pos); // Not clear if this is necessary.
975
976            // Set the end position.
977            self.upd.write_u64(0, pos);
978            self.upd.write_u64(8, size);
979            self.upd.commit(pos);
980        } else {
981            for (start, v) in self.list.iter() {
982                if *start < self.size {
983                    // Writes beyond current stg size have already been written.
984                    self.stg.write(*start, v.all());
985                }
986            }
987            self.list.clear();
988            self.stg.commit(size);
989            self.upd.commit(0);
990        }
991    }
992}
993
994impl BasicStorage for BasicAtomicFile {
995    fn commit(&mut self, size: u64) {
996        self.commit_phase(size, 1);
997        self.commit_phase(size, 2);
998        self.size = size;
999    }
1000
1001    fn size(&self) -> u64 {
1002        self.size
1003    }
1004
1005    fn read(&self, start: u64, data: &mut [u8]) {
1006        self.map.read(start, data, &*self.stg.stg);
1007    }
1008
1009    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1010        self.map.write(start, data, off, len);
1011    }
1012
1013    fn write(&mut self, start: u64, data: &[u8]) {
1014        let len = data.len();
1015        let d = Arc::new(data.to_vec());
1016        self.write_data(start, d, 0, len);
1017    }
1018}
1019
1020#[cfg(test)]
1021/// Get amount of testing from environment variable TA.
1022fn test_amount() -> usize {
1023    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1024}
1025
1026#[test]
1027fn test_atomic_file() {
1028    use rand::Rng;
1029    /* Idea of test is to check AtomicFile and MemFile behave the same */
1030
1031    let ta = test_amount();
1032    println!(" Test amount={}", ta);
1033
1034    let mut rng = rand::thread_rng();
1035
1036    for _ in 0..100 {
1037        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1038        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1039        let mut s2 = MemFile::new();
1040
1041        for _ in 0..1000 * ta {
1042            let off: usize = rng.r#gen::<usize>() % 100;
1043            let mut len = 1 + rng.r#gen::<usize>() % 20;
1044            let w: bool = rng.r#gen();
1045            if w {
1046                let mut bytes = Vec::new();
1047                while len > 0 {
1048                    len -= 1;
1049                    let b: u8 = rng.r#gen::<u8>();
1050                    bytes.push(b);
1051                }
1052                s1.write(off as u64, &bytes);
1053                s2.write(off as u64, &bytes);
1054            } else {
1055                let mut b2 = vec![0; len];
1056                let mut b3 = vec![0; len];
1057                s1.read(off as u64, &mut b2);
1058                s2.read(off as u64, &mut b3);
1059                assert!(b2 == b3);
1060            }
1061            if rng.r#gen::<usize>() % 50 == 0 {
1062                s1.commit(200);
1063                s2.commit(200);
1064            }
1065        }
1066    }
1067}