Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4
5#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cell::Cell;
9use std::cmp::min;
10use std::sync::{Arc, Mutex, RwLock};
11
12#[cfg(feature = "pstd")]
13use pstd::collections::BTreeMap;
14#[cfg(not(feature = "pstd"))]
15use std::collections::BTreeMap;
16
17/// ```Arc<Vec<u8>>```
18pub type Data = Arc<Vec<u8>>;
19
20/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
21/// Performs commit asyncronously.
22///
23/// #Example
24///
25/// ```
26/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
27/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
28/// af.write( 0, &[1,2,3,4] );
29/// af.commit(4);
30/// af.wait_complete();
31/// ```
32///
33/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
34/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
35/// the updates to underlying storage have been applied.
36pub struct AtomicFile {
37    /// New updates are written here.
38    map: WMap,
39    /// Underlying file, with previous updates mapped.
40    cf: Arc<RwLock<CommitFile>>,
41    /// File size.
42    size: u64,
43    /// For sending update maps to be saved.
44    tx: std::sync::mpsc::Sender<(u64, WMap)>,
45    /// Held by update process while it is active.
46    busy: Arc<Mutex<()>>,
47    /// Limit on size of CommitFile map.
48    map_lim: usize,
49}
50
51impl AtomicFile {
52    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
53    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
54        Self::new_with_limits(stg, upd, &Limits::default())
55    }
56
57    /// Construct Atomic file with specified limits.
58    pub fn new_with_limits(
59        stg: Box<dyn Storage>,
60        upd: Box<dyn BasicStorage>,
61        lim: &Limits,
62    ) -> Box<Self> {
63        let size = stg.size();
64        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
65
66        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
67        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
68        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
69
70        // Start the thread which does save asyncronously.
71        let (cf1, busy1) = (cf.clone(), busy.clone());
72
73        std::thread::spawn(move || {
74            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
75            while let Ok((size, map)) = rx.recv() {
76                let _lock = busy1.lock();
77                baf.map = map;
78                baf.commit(size);
79                cf1.write().unwrap().done_one();
80            }
81        });
82        Box::new(Self {
83            map: WMap::default(),
84            cf,
85            size,
86            tx,
87            busy,
88            map_lim: lim.map_lim,
89        })
90    }
91}
92
93impl Storage for AtomicFile {
94    fn clone(&self) -> Box<dyn Storage> {
95        panic!()
96    }
97}
98
99impl BasicStorage for AtomicFile {
100    fn commit(&mut self, size: u64) {
101        self.size = size;
102        if self.map.is_empty() {
103            return;
104        }
105        if self.cf.read().unwrap().map.len() > self.map_lim {
106            self.wait_complete();
107        }
108        let map = std::mem::take(&mut self.map);
109        let cf = &mut *self.cf.write().unwrap();
110        cf.todo += 1;
111        // Apply map of updates to CommitFile.
112        map.to_storage(cf);
113        // Send map of updates to thread to be written to underlying storage.
114        self.tx.send((size, map)).unwrap();
115    }
116
117    fn size(&self) -> u64 {
118        self.size
119    }
120
121    fn read(&self, start: u64, data: &mut [u8]) {
122        self.map.read(start, data, &*self.cf.read().unwrap());
123    }
124
125    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
126        self.map.write(start, data, off, len);
127    }
128
129    fn write(&mut self, start: u64, data: &[u8]) {
130        let len = data.len();
131        let d = Arc::new(data.to_vec());
132        self.write_data(start, d, 0, len);
133    }
134
135    fn wait_complete(&self) {
136        while self.cf.read().unwrap().todo != 0 {
137            let _x = self.busy.lock();
138        }
139    }
140}
141
142struct CommitFile {
143    /// Buffered underlying storage.
144    stg: ReadBufStg<256>,
145    /// Map of committed updates.
146    map: WMap,
147    /// Number of outstanding unsaved commits.
148    todo: usize,
149}
150
151impl CommitFile {
152    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
153        Self {
154            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
155            map: WMap::default(),
156            todo: 0,
157        }
158    }
159
160    fn done_one(&mut self) {
161        self.todo -= 1;
162        if self.todo == 0 {
163            self.map = WMap::default();
164            self.stg.reset();
165        }
166    }
167}
168
169impl BasicStorage for CommitFile {
170    fn commit(&mut self, _size: u64) {
171        panic!()
172    }
173
174    fn size(&self) -> u64 {
175        panic!()
176    }
177
178    fn read(&self, start: u64, data: &mut [u8]) {
179        self.map.read(start, data, &self.stg);
180    }
181
182    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
183        self.map.write(start, data, off, len);
184    }
185
186    fn write(&mut self, _start: u64, _data: &[u8]) {
187        panic!()
188    }
189}
190
191/// Storage interface - BasicStorage is some kind of "file" storage.
192///
193/// read and write methods take a start which is a byte offset in the underlying file.
194pub trait BasicStorage: Send {
195    /// Get the size of the underlying storage.
196    /// Note : this is valid initially and after a commit but is not defined after write is called.
197    fn size(&self) -> u64;
198
199    /// Read data.
200    fn read(&self, start: u64, data: &mut [u8]);
201
202    /// Write byte slice to storage.
203    fn write(&mut self, start: u64, data: &[u8]);
204
205    /// Write byte Vec.
206    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
207        let len = data.len();
208        let d = Arc::new(data);
209        self.write_data(start, d, 0, len);
210    }
211
212    /// Write Data slice.
213    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
214        self.write(start, &data[off..off + len]);
215    }
216
217    /// Finish write transaction, size is new size of underlying storage.
218    fn commit(&mut self, size: u64);
219
220    /// Write u64.
221    fn write_u64(&mut self, start: u64, value: u64) {
222        self.write(start, &value.to_le_bytes());
223    }
224
225    /// Read u64.
226    fn read_u64(&self, start: u64) -> u64 {
227        let mut bytes = [0; 8];
228        self.read(start, &mut bytes);
229        u64::from_le_bytes(bytes)
230    }
231
232    /// Wait until current writes are complete.
233    fn wait_complete(&self) {}
234}
235
236/// BasicStorage with Sync and clone.
237pub trait Storage: BasicStorage + Sync {
238    /// Clone.
239    fn clone(&self) -> Box<dyn Storage>;
240}
241
242/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
243#[derive(Default)]
244pub struct MemFile {
245    v: Arc<Mutex<Vec<u8>>>,
246}
247
248impl MemFile {
249    /// Get a new (boxed) MemFile.
250    pub fn new() -> Box<Self> {
251        Box::default()
252    }
253}
254
255impl Storage for MemFile {
256    fn clone(&self) -> Box<dyn Storage> {
257        Box::new(Self { v: self.v.clone() })
258    }
259}
260
261impl BasicStorage for MemFile {
262    fn size(&self) -> u64 {
263        let v = self.v.lock().unwrap();
264        v.len() as u64
265    }
266
267    fn read(&self, off: u64, bytes: &mut [u8]) {
268        let off = off as usize;
269        let len = bytes.len();
270        let mut v = self.v.lock().unwrap();
271        if off + len > v.len() {
272            v.resize(off + len, 0);
273        }
274        bytes.copy_from_slice(&v[off..off + len]);
275    }
276
277    fn write(&mut self, off: u64, bytes: &[u8]) {
278        let off = off as usize;
279        let len = bytes.len();
280        let mut v = self.v.lock().unwrap();
281        if off + len > v.len() {
282            v.resize(off + len, 0);
283        }
284        v[off..off + len].copy_from_slice(bytes);
285    }
286
287    fn commit(&mut self, size: u64) {
288        let mut v = self.v.lock().unwrap();
289        v.resize(size as usize, 0);
290    }
291}
292
293use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
294
295struct FileInner {
296    f: fs::File,
297}
298
299impl FileInner {
300    /// Construct from filename.
301    pub fn new(filename: &str) -> Self {
302        Self {
303            f: OpenOptions::new()
304                .read(true)
305                .write(true)
306                .create(true)
307                .truncate(false)
308                .open(filename)
309                .unwrap(),
310        }
311    }
312
313    fn size(&mut self) -> u64 {
314        self.f.seek(SeekFrom::End(0)).unwrap()
315    }
316
317    fn read(&mut self, off: u64, bytes: &mut [u8]) {
318        self.f.seek(SeekFrom::Start(off)).unwrap();
319        let _ = self.f.read(bytes).unwrap();
320    }
321
322    fn write(&mut self, off: u64, bytes: &[u8]) {
323        // The list of operating systems which auto-zero is likely more than this...research is todo.
324        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
325        {
326            let size = self.f.seek(SeekFrom::End(0)).unwrap();
327            if off > size {
328                self.f.set_len(off).unwrap();
329            }
330        }
331        self.f.seek(SeekFrom::Start(off)).unwrap();
332        let _ = self.f.write(bytes).unwrap();
333    }
334
335    fn commit(&mut self, size: u64) {
336        self.f.set_len(size).unwrap();
337        self.f.sync_all().unwrap();
338    }
339}
340
341/// For atomic upd file, if not unix or windows.
342pub struct UpdFileStorage {
343    file: Cell<Option<FileInner>>,
344}
345
346impl UpdFileStorage {
347    /// Construct from filename.
348    pub fn new(filename: &str) -> Box<Self> {
349        Box::new(Self {
350            file: Cell::new(Some(FileInner::new(filename))),
351        })
352    }
353}
354
355impl BasicStorage for UpdFileStorage {
356    fn size(&self) -> u64 {
357        let mut f = self.file.take().unwrap();
358        let result = f.size();
359        self.file.set(Some(f));
360        result
361    }
362    fn read(&self, off: u64, bytes: &mut [u8]) {
363        let mut f = self.file.take().unwrap();
364        f.read(off, bytes);
365        self.file.set(Some(f));
366    }
367
368    fn write(&mut self, off: u64, bytes: &[u8]) {
369        let mut f = self.file.take().unwrap();
370        f.write(off, bytes);
371        self.file.set(Some(f));
372    }
373
374    fn commit(&mut self, size: u64) {
375        let mut f = self.file.take().unwrap();
376        f.commit(size);
377        self.file.set(Some(f));
378    }
379}
380
381/// Simple implementation of [Storage] using [`std::fs::File`].
382pub struct SimpleFileStorage {
383    file: Arc<Mutex<FileInner>>,
384}
385
386impl SimpleFileStorage {
387    /// Construct from filename.
388    pub fn new(filename: &str) -> Box<Self> {
389        Box::new(Self {
390            file: Arc::new(Mutex::new(FileInner::new(filename))),
391        })
392    }
393}
394
395impl Storage for SimpleFileStorage {
396    fn clone(&self) -> Box<dyn Storage> {
397        Box::new(Self {
398            file: self.file.clone(),
399        })
400    }
401}
402
403impl BasicStorage for SimpleFileStorage {
404    fn size(&self) -> u64 {
405        self.file.lock().unwrap().size()
406    }
407
408    fn read(&self, off: u64, bytes: &mut [u8]) {
409        self.file.lock().unwrap().read(off, bytes);
410    }
411
412    fn write(&mut self, off: u64, bytes: &[u8]) {
413        self.file.lock().unwrap().write(off, bytes);
414    }
415
416    fn commit(&mut self, size: u64) {
417        self.file.lock().unwrap().commit(size);
418    }
419}
420
421/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
422pub struct AnyFileStorage {
423    filename: String,
424    files: Arc<Mutex<Vec<FileInner>>>,
425}
426
427impl AnyFileStorage {
428    /// Create new.
429    pub fn new(filename: &str) -> Box<Self> {
430        Box::new(Self {
431            filename: filename.to_owned(),
432            files: Arc::new(Mutex::new(Vec::new())),
433        })
434    }
435
436    fn get_file(&self) -> FileInner {
437        match self.files.lock().unwrap().pop() {
438            Some(f) => f,
439            _ => FileInner::new(&self.filename),
440        }
441    }
442
443    fn put_file(&self, f: FileInner) {
444        self.files.lock().unwrap().push(f);
445    }
446}
447
448impl Storage for AnyFileStorage {
449    fn clone(&self) -> Box<dyn Storage> {
450        Box::new(Self {
451            filename: self.filename.clone(),
452            files: self.files.clone(),
453        })
454    }
455}
456
457impl BasicStorage for AnyFileStorage {
458    fn size(&self) -> u64 {
459        let mut f = self.get_file();
460        let result = f.size();
461        self.put_file(f);
462        result
463    }
464
465    fn read(&self, off: u64, bytes: &mut [u8]) {
466        let mut f = self.get_file();
467        f.read(off, bytes);
468        self.put_file(f);
469    }
470
471    fn write(&mut self, off: u64, bytes: &[u8]) {
472        let mut f = self.get_file();
473        f.write(off, bytes);
474        self.put_file(f);
475    }
476
477    fn commit(&mut self, size: u64) {
478        let mut f = self.get_file();
479        f.commit(size);
480        self.put_file(f);
481    }
482}
483
484/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
485pub struct DummyFile {}
486impl DummyFile {
487    /// Construct.
488    pub fn new() -> Box<Self> {
489        Box::new(Self {})
490    }
491}
492
493impl Storage for DummyFile {
494    fn clone(&self) -> Box<dyn Storage> {
495        Self::new()
496    }
497}
498
499impl BasicStorage for DummyFile {
500    fn size(&self) -> u64 {
501        0
502    }
503
504    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
505
506    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
507
508    fn commit(&mut self, _size: u64) {}
509}
510
511/// Memory configuration limits.
512#[non_exhaustive]
513pub struct Limits {
514    /// Limit on size of commit write map, default is 5000.
515    pub map_lim: usize,
516    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
517    pub rbuf_mem: usize,
518    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
519    pub swbuf: usize,
520    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
521    pub uwbuf: usize,
522}
523
524impl Default for Limits {
525    fn default() -> Self {
526        Self {
527            map_lim: 5000,
528            rbuf_mem: 0x200000,
529            swbuf: 0x100000,
530            uwbuf: 0x100000,
531        }
532    }
533}
534
535/// Write Buffer.
536struct WriteBuffer {
537    /// Current write index into buf.
538    ix: usize,
539    /// Current file position.
540    pos: u64,
541    /// Underlying storage.
542    pub stg: Box<dyn BasicStorage>,
543    /// Buffer.
544    buf: Vec<u8>,
545}
546
547impl WriteBuffer {
548    /// Construct.
549    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
550        Self {
551            ix: 0,
552            pos: u64::MAX,
553            stg,
554            buf: vec![0; buf_size],
555        }
556    }
557
558    /// Write data to specified offset,
559    pub fn write(&mut self, off: u64, data: &[u8]) {
560        if self.pos + self.ix as u64 != off {
561            self.flush(off);
562        }
563        let mut done: usize = 0;
564        let mut todo: usize = data.len();
565        while todo > 0 {
566            let mut n: usize = self.buf.len() - self.ix;
567            if n == 0 {
568                self.flush(off + done as u64);
569                n = self.buf.len();
570            }
571            if n > todo {
572                n = todo;
573            }
574            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
575            todo -= n;
576            done += n;
577            self.ix += n;
578        }
579    }
580
581    fn flush(&mut self, new_pos: u64) {
582        if self.ix > 0 {
583            self.stg.write(self.pos, &self.buf[0..self.ix]);
584        }
585        self.ix = 0;
586        self.pos = new_pos;
587    }
588
589    /// Commit.
590    pub fn commit(&mut self, size: u64) {
591        self.flush(u64::MAX);
592        self.stg.commit(size);
593    }
594
595    /// Write u64.
596    pub fn write_u64(&mut self, start: u64, value: u64) {
597        self.write(start, &value.to_le_bytes());
598    }
599}
600
601/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
602///
603/// See implementation of AtomicFile for how this is used in conjunction with WMap.
604///
605/// N is buffer size.
606struct ReadBufStg<const N: usize> {
607    /// Underlying storage.
608    stg: Box<dyn Storage>,
609    /// Buffers.
610    buf: Mutex<ReadBuffer<N>>,
611    /// Read size that is considered small.
612    limit: usize,
613}
614
615impl<const N: usize> Drop for ReadBufStg<N> {
616    fn drop(&mut self) {
617        self.reset();
618    }
619}
620
621impl<const N: usize> ReadBufStg<N> {
622    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
623    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
624        Self {
625            stg,
626            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
627            limit,
628        }
629    }
630
631    /// Clears the buffers.
632    fn reset(&mut self) {
633        self.buf.lock().unwrap().reset();
634    }
635}
636
637impl<const N: usize> BasicStorage for ReadBufStg<N> {
638    /// Read data from storage.
639    fn read(&self, start: u64, data: &mut [u8]) {
640        if data.len() <= self.limit {
641            self.buf.lock().unwrap().read(&*self.stg, start, data);
642        } else {
643            self.stg.read(start, data);
644        }
645    }
646
647    /// Panics.
648    fn size(&self) -> u64 {
649        panic!()
650    }
651
652    /// Panics.
653    fn write(&mut self, _start: u64, _data: &[u8]) {
654        panic!();
655    }
656
657    /// Panics.
658    fn commit(&mut self, _size: u64) {
659        panic!();
660    }
661}
662
663struct ReadBuffer<const N: usize> {
664    /// Maps sector mumbers cached buffers.
665    map: HashMap<u64, Box<[u8; N]>>,
666    /// Maximum number of buffers.
667    max_buf: usize,
668}
669
670impl<const N: usize> ReadBuffer<N> {
671    fn new(max_buf: usize) -> Self {
672        Self {
673            map: HashMap::default(),
674            max_buf,
675        }
676    }
677
678    fn reset(&mut self) {
679        self.map.clear();
680    }
681
682    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
683        let mut done = 0;
684        while done < data.len() {
685            let off = off + done as u64;
686            let sector = off / N as u64;
687            let disp = (off % N as u64) as usize;
688            let amount = min(data.len() - done, N - disp);
689
690            let p = self.map.entry(sector).or_insert_with(|| {
691                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
692                stg.read(sector * N as u64, &mut *p);
693                p
694            });
695            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
696            done += amount;
697        }
698        if self.map.len() >= self.max_buf {
699            self.reset();
700        }
701    }
702}
703
704#[derive(Default)]
705/// Slice of Data to be written to storage.
706struct DataSlice {
707    /// Slice data.
708    pub data: Data,
709    /// Start of slice.
710    pub off: usize,
711    /// Length of slice.
712    pub len: usize,
713}
714
715impl DataSlice {
716    /// Get reference to the whole slice.
717    pub fn all(&self) -> &[u8] {
718        &self.data[self.off..self.off + self.len]
719    }
720    /// Get reference to part of slice.
721    pub fn part(&self, off: usize, len: usize) -> &[u8] {
722        &self.data[self.off + off..self.off + off + len]
723    }
724    /// Trim specified amount from start of slice.
725    pub fn trim(&mut self, trim: usize) {
726        self.off += trim;
727        self.len -= trim;
728    }
729    /// Take the data.
730    #[allow(dead_code)]
731    pub fn take(&mut self) -> Data {
732        std::mem::take(&mut self.data)
733    }
734}
735
736#[derive(Default)]
737/// Updateable store based on some underlying storage.
738struct WMap {
739    /// Map of writes. Key is the end of the slice.
740    map: BTreeMap<u64, DataSlice>,
741}
742
743impl WMap {
744    /// Is the map empty?
745    pub fn is_empty(&self) -> bool {
746        self.map.is_empty()
747    }
748
749    /// Number of key-value pairs in the map.
750    pub fn len(&self) -> usize {
751        self.map.len()
752    }
753
754    /// Take the map and convert it to a Vec.
755    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
756        let map = std::mem::take(&mut self.map);
757        let mut result = Vec::with_capacity(map.len());
758        for (end, v) in map {
759            let start = end - v.len as u64;
760            result.push((start, v));
761        }
762        result
763    }
764
765    /// Write the map into storage.
766    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
767        for (end, v) in self.map.iter() {
768            let start = end - v.len as u64;
769            stg.write_data(start, v.data.clone(), v.off, v.len);
770        }
771    }
772
773    #[cfg(not(feature = "pstd"))]
774    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
775    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
776        if len != 0 {
777            let (mut insert, mut remove) = (Vec::new(), Vec::new());
778            let end = start + len as u64;
779            for (ee, v) in self.map.range_mut(start + 1..) {
780                let ee = *ee;
781                let es = ee - v.len as u64; // Existing write Start.
782                if es >= end {
783                    // Existing write starts after end of new write, nothing to do.
784                    break;
785                } else if start <= es {
786                    if end < ee {
787                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
788                        v.trim((end - es) as usize);
789                        break;
790                    }
791                    // New write subsumes existing write entirely, remove existing write.
792                    remove.push(ee);
793                } else if end < ee {
794                    // New write starts in middle of existing write, ends before end of existing write,
795                    // put start of existing write in insert list, trim existing write.
796                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
797                    v.trim((end - es) as usize);
798                    break;
799                } else {
800                    // New write starts in middle of existing write, ends after existing write,
801                    // put start of existing write in insert list, remove existing write.
802                    insert.push((es, v.take(), v.off, (start - es) as usize));
803                    remove.push(ee);
804                }
805            }
806            for end in remove {
807                self.map.remove(&end);
808            }
809            for (start, data, off, len) in insert {
810                self.map
811                    .insert(start + len as u64, DataSlice { data, off, len });
812            }
813            self.map
814                .insert(start + len as u64, DataSlice { data, off, len });
815        }
816    }
817
818    #[cfg(feature = "pstd")]
819    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
820    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
821        if len != 0 {
822            let end = start + len as u64;
823            let mut c = self
824                .map
825                .lower_bound_mut(std::ops::Bound::Excluded(&start))
826                .with_mutable_key();
827            while let Some((eend, v)) = c.next() {
828                let ee = *eend;
829                let es = ee - v.len as u64; // Existing write Start.
830                if es >= end {
831                    // Existing write starts after end of new write, nothing to do.
832                    c.prev();
833                    break;
834                } else if start <= es {
835                    if end < ee {
836                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
837                        v.trim((end - es) as usize);
838                        c.prev();
839                        break;
840                    }
841                    // New write subsumes existing write entirely, remove existing write.
842                    c.remove_prev();
843                } else if end < ee {
844                    // New write starts in middle of existing write, ends before end of existing write,
845                    // trim existing write, insert start of existing write.
846                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
847                    v.trim((end - es) as usize);
848                    c.prev();
849                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
850                    break;
851                } else {
852                    // New write starts in middle of existing write, ends after existing write,
853                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
854                    v.len = (start - es) as usize;
855                    *eend = es + v.len as u64;
856                }
857            }
858            // Insert the new write.
859            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
860        }
861    }
862
863    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
864    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
865        let len = data.len();
866        if len != 0 {
867            let mut done = 0;
868            for (&end, v) in self.map.range(start + 1..) {
869                let es = end - v.len as u64; // Existing write Start.
870                let doff = start + done as u64;
871                if es > doff {
872                    // Read from underlying storage.
873                    let a = min(len - done, (es - doff) as usize);
874                    u.read(doff, &mut data[done..done + a]);
875                    done += a;
876                    if done == len {
877                        return;
878                    }
879                }
880                // Use existing write.
881                let skip = (start + done as u64 - es) as usize;
882                let a = min(len - done, v.len - skip);
883                data[done..done + a].copy_from_slice(v.part(skip, a));
884                done += a;
885                if done == len {
886                    return;
887                }
888            }
889            u.read(start + done as u64, &mut data[done..]);
890        }
891    }
892}
893
894/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
895pub struct BasicAtomicFile {
896    /// The main underlying storage.
897    stg: WriteBuffer,
898    /// Temporary storage for updates during commit.
899    upd: WriteBuffer,
900    /// Map of writes.
901    map: WMap,
902    /// List of writes.
903    list: Vec<(u64, DataSlice)>,
904    size: u64,
905}
906
907impl BasicAtomicFile {
908    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
909    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
910        let size = stg.size();
911        let mut result = Box::new(Self {
912            stg: WriteBuffer::new(stg, lim.swbuf),
913            upd: WriteBuffer::new(upd, lim.uwbuf),
914            map: WMap::default(),
915            list: Vec::new(),
916            size,
917        });
918        result.init();
919        result
920    }
921
922    /// Apply outstanding updates.
923    fn init(&mut self) {
924        let end = self.upd.stg.read_u64(0);
925        let size = self.upd.stg.read_u64(8);
926        if end == 0 {
927            return;
928        }
929        assert!(end == self.upd.stg.size());
930        let mut pos = 16;
931        while pos < end {
932            let start = self.upd.stg.read_u64(pos);
933            pos += 8;
934            let len = self.upd.stg.read_u64(pos);
935            pos += 8;
936            let mut buf = vec![0; len as usize];
937            self.upd.stg.read(pos, &mut buf);
938            pos += len;
939            self.stg.write(start, &buf);
940        }
941        self.stg.commit(size);
942        self.upd.commit(0);
943    }
944
945    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
946    pub fn commit_phase(&mut self, size: u64, phase: u8) {
947        if self.map.is_empty() && self.list.is_empty() {
948            return;
949        }
950        if phase == 1 {
951            self.list = self.map.convert_to_vec();
952
953            // Write the updates to upd.
954            // First set the end position to zero.
955            self.upd.write_u64(0, 0);
956            self.upd.write_u64(8, size);
957            self.upd.commit(16); // Not clear if this is necessary.
958
959            // Write the update records.
960            let mut stg_written = false;
961            let mut pos: u64 = 16;
962            for (start, v) in self.list.iter() {
963                let (start, len, data) = (*start, v.len as u64, v.all());
964                if start >= self.size {
965                    // Writes beyond current stg size can be written directly.
966                    stg_written = true;
967                    self.stg.write(start, data);
968                } else {
969                    self.upd.write_u64(pos, start);
970                    pos += 8;
971                    self.upd.write_u64(pos, len);
972                    pos += 8;
973                    self.upd.write(pos, data);
974                    pos += len;
975                }
976            }
977            if stg_written {
978                self.stg.commit(size);
979            }
980            self.upd.commit(pos); // Not clear if this is necessary.
981
982            // Set the end position.
983            self.upd.write_u64(0, pos);
984            self.upd.write_u64(8, size);
985            self.upd.commit(pos);
986        } else {
987            for (start, v) in self.list.iter() {
988                if *start < self.size {
989                    // Writes beyond current stg size have already been written.
990                    self.stg.write(*start, v.all());
991                }
992            }
993            self.list.clear();
994            self.stg.commit(size);
995            self.upd.commit(0);
996        }
997    }
998}
999
1000impl BasicStorage for BasicAtomicFile {
1001    fn commit(&mut self, size: u64) {
1002        self.commit_phase(size, 1);
1003        self.commit_phase(size, 2);
1004        self.size = size;
1005    }
1006
1007    fn size(&self) -> u64 {
1008        self.size
1009    }
1010
1011    fn read(&self, start: u64, data: &mut [u8]) {
1012        self.map.read(start, data, &*self.stg.stg);
1013    }
1014
1015    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1016        self.map.write(start, data, off, len);
1017    }
1018
1019    fn write(&mut self, start: u64, data: &[u8]) {
1020        let len = data.len();
1021        let d = Arc::new(data.to_vec());
1022        self.write_data(start, d, 0, len);
1023    }
1024}
1025
1026/// Optimized implementation of [Storage] ( unix only ).
1027#[cfg(target_family = "unix")]
1028pub struct UnixFileStorage {
1029    size: Arc<Mutex<u64>>,
1030    f: fs::File,
1031}
1032#[cfg(target_family = "unix")]
1033impl UnixFileStorage {
1034    /// Construct from filename.
1035    pub fn new(filename: &str) -> Box<Self> {
1036        let mut f = OpenOptions::new()
1037            .read(true)
1038            .write(true)
1039            .create(true)
1040            .truncate(false)
1041            .open(filename)
1042            .unwrap();
1043        let size = f.seek(SeekFrom::End(0)).unwrap();
1044        let size = Arc::new(Mutex::new(size));
1045        Box::new(Self { size, f })
1046    }
1047}
1048
1049#[cfg(target_family = "unix")]
1050impl Storage for UnixFileStorage {
1051    fn clone(&self) -> Box<dyn Storage> {
1052        Box::new(Self {
1053            size: self.size.clone(),
1054            f: self.f.try_clone().unwrap(),
1055        })
1056    }
1057}
1058
1059#[cfg(target_family = "unix")]
1060use std::os::unix::fs::FileExt;
1061
1062#[cfg(target_family = "unix")]
1063impl BasicStorage for UnixFileStorage {
1064    fn read(&self, start: u64, data: &mut [u8]) {
1065        let _ = self.f.read_at(data, start);
1066    }
1067
1068    fn write(&mut self, start: u64, data: &[u8]) {
1069        let _ = self.f.write_at(data, start);
1070    }
1071
1072    fn size(&self) -> u64 {
1073        *self.size.lock().unwrap()
1074    }
1075
1076    fn commit(&mut self, size: u64) {
1077        *self.size.lock().unwrap() = size;
1078        self.f.set_len(size).unwrap();
1079        self.f.sync_all().unwrap();
1080    }
1081}
1082
1083/// Optimized implementation of [Storage] ( windows only ).
1084#[cfg(target_family = "windows")]
1085pub struct WindowsFileStorage {
1086    size: Arc<Mutex<u64>>,
1087    f: fs::File,
1088}
1089#[cfg(target_family = "windows")]
1090impl WindowsFileStorage {
1091    /// Construct from filename.
1092    pub fn new(filename: &str) -> Box<Self> {
1093        let mut f = OpenOptions::new()
1094            .read(true)
1095            .write(true)
1096            .create(true)
1097            .truncate(false)
1098            .open(filename)
1099            .unwrap();
1100        let size = f.seek(SeekFrom::End(0)).unwrap();
1101        let size = Arc::new(Mutex::new(size));
1102        Box::new(Self { size, f })
1103    }
1104}
1105
1106#[cfg(target_family = "windows")]
1107impl Storage for WindowsFileStorage {
1108    fn clone(&self) -> Box<dyn Storage> {
1109        Box::new(Self {
1110            size: self.size.clone(),
1111            f: self.f.try_clone().unwrap(),
1112        })
1113    }
1114}
1115
1116#[cfg(target_family = "windows")]
1117use std::os::windows::fs::FileExt;
1118
1119#[cfg(target_family = "windows")]
1120impl BasicStorage for WindowsFileStorage {
1121    fn read(&self, start: u64, data: &mut [u8]) {
1122        let _ = self.f.seek_read(data, start);
1123    }
1124
1125    fn write(&mut self, start: u64, data: &[u8]) {
1126        let _ = self.f.seek_write(data, start);
1127    }
1128
1129    fn size(&self) -> u64 {
1130        *self.size.lock().unwrap()
1131    }
1132
1133    fn commit(&mut self, size: u64) {
1134        *self.size.lock().unwrap() = size;
1135        self.f.set_len(size).unwrap();
1136        self.f.sync_all().unwrap();
1137    }
1138}
1139
1140/// Optimised Storage
1141#[cfg(target_family = "windows")]
1142pub type MultiFileStorage = WindowsFileStorage;
1143
1144/// Optimised Storage
1145#[cfg(target_family = "unix")]
1146pub type MultiFileStorage = UnixFileStorage;
1147
1148/// Optimised Storage
1149#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1150pub type MultiFileStorage = AnyFileStorage;
1151
1152/// Fast Storage for upd file
1153#[cfg(any(target_family = "windows", target_family = "unix"))]
1154pub type FastFileStorage = MultiFileStorage;
1155
1156/// Fast Storage for upd file
1157#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1158pub type FastFileStorage = UpdFileStorage;
1159
1160#[cfg(test)]
1161/// Get amount of testing from environment variable TA.
1162fn test_amount() -> usize {
1163    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1164}
1165
1166#[test]
1167fn test_atomic_file() {
1168    use rand::Rng;
1169    /* Idea of test is to check AtomicFile and MemFile behave the same */
1170
1171    let ta = test_amount();
1172    println!(" Test amount={}", ta);
1173
1174    let mut rng = rand::thread_rng();
1175
1176    for _ in 0..100 {
1177        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1178        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1179        let mut s2 = MemFile::new();
1180
1181        for _ in 0..1000 * ta {
1182            let off: usize = rng.r#gen::<usize>() % 100;
1183            let mut len = 1 + rng.r#gen::<usize>() % 20;
1184            let w: bool = rng.r#gen();
1185            if w {
1186                let mut bytes = Vec::new();
1187                while len > 0 {
1188                    len -= 1;
1189                    let b: u8 = rng.r#gen::<u8>();
1190                    bytes.push(b);
1191                }
1192                s1.write(off as u64, &bytes);
1193                s2.write(off as u64, &bytes);
1194            } else {
1195                let mut b2 = vec![0; len];
1196                let mut b3 = vec![0; len];
1197                s1.read(off as u64, &mut b2);
1198                s2.read(off as u64, &mut b3);
1199                assert!(b2 == b3);
1200            }
1201            if rng.r#gen::<usize>() % 50 == 0 {
1202                s1.commit(200);
1203                s2.commit(200);
1204            }
1205        }
1206    }
1207}