Skip to main content

atom_file/
lib.rs

1//! [`AtomicFile`] provides buffered concurrent access to files with async atomic commit.
2//!
3//! [`BasicAtomicFile`] is a non-async alternative.
4//!
5//! [`MultiFileStorage`] is the recommended backing storage for AtomicFile.
6//!
7//! [`FastFileStorage`] is the recommended temporary storage for AtomicFile.
8
9#![deny(missing_docs)]
10
11use rustc_hash::FxHashMap as HashMap;
12use std::cell::Cell;
13use std::cmp::min;
14use std::sync::{Arc, Mutex, RwLock};
15
16#[cfg(feature = "pstd")]
17use pstd::collections::BTreeMap;
18#[cfg(not(feature = "pstd"))]
19use std::collections::BTreeMap;
20
21/// ```Arc<Vec<u8>>```
22pub type Data = Arc<Vec<u8>>;
23
24/// Based on [BasicAtomicFile] which makes sure that updates are all-or-nothing.
25/// Performs commit asyncronously.
26///
27/// #Example
28///
29/// ```
30/// use atom_file::{AtomicFile,DummyFile,MemFile,BasicStorage};
31/// let mut af = AtomicFile::new(MemFile::new(), DummyFile::new());
32/// af.write( 0, &[1,2,3,4] );
33/// af.commit(4);
34/// af.wait_complete();
35/// ```
36///
37/// Atomic file has two maps of writes. On commit, the latest batch of writes are sent to be written to underlying
38/// storage, and are also applied to the second map in the "CommitFile". The CommitFile map is reset when all
39/// the updates to underlying storage have been applied.
40pub struct AtomicFile {
41    /// New updates are written here.
42    map: WMap,
43    /// Underlying file, with previous updates mapped.
44    cf: Arc<RwLock<CommitFile>>,
45    /// File size.
46    size: u64,
47    /// For sending update maps to be saved.
48    tx: std::sync::mpsc::Sender<(u64, WMap)>,
49    /// Held by update process while it is active.
50    busy: Arc<Mutex<()>>,
51    /// Limit on size of CommitFile map.
52    map_lim: usize,
53}
54
55impl AtomicFile {
56    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
57    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
58        Self::new_with_limits(stg, upd, &Limits::default())
59    }
60
61    /// Construct Atomic file with specified limits.
62    pub fn new_with_limits(
63        stg: Box<dyn Storage>,
64        upd: Box<dyn BasicStorage>,
65        lim: &Limits,
66    ) -> Box<Self> {
67        let size = stg.size();
68        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
69
70        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
71        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
72        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
73
74        // Start the thread which does save asyncronously.
75        let (cf1, busy1) = (cf.clone(), busy.clone());
76
77        std::thread::spawn(move || {
78            // Loop that recieves a map of updates and applies it to BasicAtomicFile.
79            while let Ok((size, map)) = rx.recv() {
80                let _lock = busy1.lock();
81                baf.map = map;
82                baf.commit(size);
83                cf1.write().unwrap().done_one();
84            }
85        });
86        Box::new(Self {
87            map: WMap::default(),
88            cf,
89            size,
90            tx,
91            busy,
92            map_lim: lim.map_lim,
93        })
94    }
95}
96
97impl Storage for AtomicFile {
98    fn clone(&self) -> Box<dyn Storage> {
99        panic!()
100    }
101}
102
103impl BasicStorage for AtomicFile {
104    fn commit(&mut self, size: u64) {
105        self.size = size;
106        if self.map.is_empty() {
107            return;
108        }
109        if self.cf.read().unwrap().map.len() > self.map_lim {
110            self.wait_complete();
111        }
112        let map = std::mem::take(&mut self.map);
113        let cf = &mut *self.cf.write().unwrap();
114        cf.todo += 1;
115        // Apply map of updates to CommitFile.
116        map.to_storage(cf);
117        // Send map of updates to thread to be written to underlying storage.
118        self.tx.send((size, map)).unwrap();
119    }
120
121    fn size(&self) -> u64 {
122        self.size
123    }
124
125    fn read(&self, start: u64, data: &mut [u8]) {
126        self.map.read(start, data, &*self.cf.read().unwrap());
127    }
128
129    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
130        self.map.write(start, data, off, len);
131    }
132
133    fn write(&mut self, start: u64, data: &[u8]) {
134        let len = data.len();
135        let d = Arc::new(data.to_vec());
136        self.write_data(start, d, 0, len);
137    }
138
139    fn wait_complete(&self) {
140        while self.cf.read().unwrap().todo != 0 {
141            let _x = self.busy.lock();
142        }
143    }
144}
145
146struct CommitFile {
147    /// Buffered underlying storage.
148    stg: ReadBufStg<256>,
149    /// Map of committed updates.
150    map: WMap,
151    /// Number of outstanding unsaved commits.
152    todo: usize,
153}
154
155impl CommitFile {
156    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
157        Self {
158            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
159            map: WMap::default(),
160            todo: 0,
161        }
162    }
163
164    fn done_one(&mut self) {
165        self.todo -= 1;
166        if self.todo == 0 {
167            self.map = WMap::default();
168            self.stg.reset();
169        }
170    }
171}
172
173impl BasicStorage for CommitFile {
174    fn commit(&mut self, _size: u64) {
175        panic!()
176    }
177
178    fn size(&self) -> u64 {
179        panic!()
180    }
181
182    fn read(&self, start: u64, data: &mut [u8]) {
183        self.map.read(start, data, &self.stg);
184    }
185
186    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
187        self.map.write(start, data, off, len);
188    }
189
190    fn write(&mut self, _start: u64, _data: &[u8]) {
191        panic!()
192    }
193}
194
195/// Storage interface - BasicStorage is some kind of "file" storage.
196///
197/// read and write methods take a start which is a byte offset in the underlying file.
198pub trait BasicStorage: Send {
199    /// Get the size of the underlying storage.
200    /// Note : this is valid initially and after a commit but is not defined after write is called.
201    fn size(&self) -> u64;
202
203    /// Read data.
204    fn read(&self, start: u64, data: &mut [u8]);
205
206    /// Write byte slice to storage.
207    fn write(&mut self, start: u64, data: &[u8]);
208
209    /// Write byte Vec.
210    fn write_vec(&mut self, start: u64, data: Vec<u8>) {
211        let len = data.len();
212        let d = Arc::new(data);
213        self.write_data(start, d, 0, len);
214    }
215
216    /// Write Data slice.
217    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
218        self.write(start, &data[off..off + len]);
219    }
220
221    /// Finish write transaction, size is new size of underlying storage.
222    fn commit(&mut self, size: u64);
223
224    /// Write u64.
225    fn write_u64(&mut self, start: u64, value: u64) {
226        self.write(start, &value.to_le_bytes());
227    }
228
229    /// Read u64.
230    fn read_u64(&self, start: u64) -> u64 {
231        let mut bytes = [0; 8];
232        self.read(start, &mut bytes);
233        u64::from_le_bytes(bytes)
234    }
235
236    /// Wait until current writes are complete.
237    fn wait_complete(&self) {}
238}
239
240/// BasicStorage with Sync and clone.
241pub trait Storage: BasicStorage + Sync {
242    /// Clone.
243    fn clone(&self) -> Box<dyn Storage>;
244}
245
246/// Simple implementation of [Storage] using `Arc<Mutex<Vec<u8>>`.
247#[derive(Default)]
248pub struct MemFile {
249    v: Arc<Mutex<Vec<u8>>>,
250}
251
252impl MemFile {
253    /// Get a new (boxed) MemFile.
254    pub fn new() -> Box<Self> {
255        Box::default()
256    }
257}
258
259impl Storage for MemFile {
260    fn clone(&self) -> Box<dyn Storage> {
261        Box::new(Self { v: self.v.clone() })
262    }
263}
264
265impl BasicStorage for MemFile {
266    fn size(&self) -> u64 {
267        let v = self.v.lock().unwrap();
268        v.len() as u64
269    }
270
271    fn read(&self, off: u64, bytes: &mut [u8]) {
272        let off = off as usize;
273        let len = bytes.len();
274        let mut v = self.v.lock().unwrap();
275        if off + len > v.len() {
276            v.resize(off + len, 0);
277        }
278        bytes.copy_from_slice(&v[off..off + len]);
279    }
280
281    fn write(&mut self, off: u64, bytes: &[u8]) {
282        let off = off as usize;
283        let len = bytes.len();
284        let mut v = self.v.lock().unwrap();
285        if off + len > v.len() {
286            v.resize(off + len, 0);
287        }
288        v[off..off + len].copy_from_slice(bytes);
289    }
290
291    fn commit(&mut self, size: u64) {
292        let mut v = self.v.lock().unwrap();
293        v.resize(size as usize, 0);
294    }
295}
296
297use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
298
299struct FileInner {
300    f: fs::File,
301}
302
303impl FileInner {
304    /// Construct from filename.
305    pub fn new(filename: &str) -> Self {
306        Self {
307            f: OpenOptions::new()
308                .read(true)
309                .write(true)
310                .create(true)
311                .truncate(false)
312                .open(filename)
313                .unwrap(),
314        }
315    }
316
317    fn size(&mut self) -> u64 {
318        self.f.seek(SeekFrom::End(0)).unwrap()
319    }
320
321    fn read(&mut self, off: u64, bytes: &mut [u8]) {
322        self.f.seek(SeekFrom::Start(off)).unwrap();
323        let _ = self.f.read(bytes).unwrap();
324    }
325
326    fn write(&mut self, off: u64, bytes: &[u8]) {
327        // The list of operating systems which auto-zero is likely more than this...research is todo.
328        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
329        {
330            let size = self.f.seek(SeekFrom::End(0)).unwrap();
331            if off > size {
332                self.f.set_len(off).unwrap();
333            }
334        }
335        self.f.seek(SeekFrom::Start(off)).unwrap();
336        let _ = self.f.write(bytes).unwrap();
337    }
338
339    fn commit(&mut self, size: u64) {
340        self.f.set_len(size).unwrap();
341        self.f.sync_all().unwrap();
342    }
343}
344
345/// For atomic upd file, if not unix or windows.
346pub struct UpdFileStorage {
347    file: Cell<Option<FileInner>>,
348}
349
350impl UpdFileStorage {
351    /// Construct from filename.
352    pub fn new(filename: &str) -> Box<Self> {
353        Box::new(Self {
354            file: Cell::new(Some(FileInner::new(filename))),
355        })
356    }
357}
358
359impl BasicStorage for UpdFileStorage {
360    fn size(&self) -> u64 {
361        let mut f = self.file.take().unwrap();
362        let result = f.size();
363        self.file.set(Some(f));
364        result
365    }
366    fn read(&self, off: u64, bytes: &mut [u8]) {
367        let mut f = self.file.take().unwrap();
368        f.read(off, bytes);
369        self.file.set(Some(f));
370    }
371
372    fn write(&mut self, off: u64, bytes: &[u8]) {
373        let mut f = self.file.take().unwrap();
374        f.write(off, bytes);
375        self.file.set(Some(f));
376    }
377
378    fn commit(&mut self, size: u64) {
379        let mut f = self.file.take().unwrap();
380        f.commit(size);
381        self.file.set(Some(f));
382    }
383}
384
385/// Simple implementation of [Storage] using [`std::fs::File`].
386pub struct SimpleFileStorage {
387    file: Arc<Mutex<FileInner>>,
388}
389
390impl SimpleFileStorage {
391    /// Construct from filename.
392    pub fn new(filename: &str) -> Box<Self> {
393        Box::new(Self {
394            file: Arc::new(Mutex::new(FileInner::new(filename))),
395        })
396    }
397}
398
399impl Storage for SimpleFileStorage {
400    fn clone(&self) -> Box<dyn Storage> {
401        Box::new(Self {
402            file: self.file.clone(),
403        })
404    }
405}
406
407impl BasicStorage for SimpleFileStorage {
408    fn size(&self) -> u64 {
409        self.file.lock().unwrap().size()
410    }
411
412    fn read(&self, off: u64, bytes: &mut [u8]) {
413        self.file.lock().unwrap().read(off, bytes);
414    }
415
416    fn write(&mut self, off: u64, bytes: &[u8]) {
417        self.file.lock().unwrap().write(off, bytes);
418    }
419
420    fn commit(&mut self, size: u64) {
421        self.file.lock().unwrap().commit(size);
422    }
423}
424
425/// Alternative to SimpleFileStorage that uses multiple [SimpleFileStorage]s to allow parallel reads by different threads.
426pub struct AnyFileStorage {
427    filename: String,
428    files: Arc<Mutex<Vec<FileInner>>>,
429}
430
431impl AnyFileStorage {
432    /// Create new.
433    pub fn new(filename: &str) -> Box<Self> {
434        Box::new(Self {
435            filename: filename.to_owned(),
436            files: Arc::new(Mutex::new(Vec::new())),
437        })
438    }
439
440    fn get_file(&self) -> FileInner {
441        match self.files.lock().unwrap().pop() {
442            Some(f) => f,
443            _ => FileInner::new(&self.filename),
444        }
445    }
446
447    fn put_file(&self, f: FileInner) {
448        self.files.lock().unwrap().push(f);
449    }
450}
451
452impl Storage for AnyFileStorage {
453    fn clone(&self) -> Box<dyn Storage> {
454        Box::new(Self {
455            filename: self.filename.clone(),
456            files: self.files.clone(),
457        })
458    }
459}
460
461impl BasicStorage for AnyFileStorage {
462    fn size(&self) -> u64 {
463        let mut f = self.get_file();
464        let result = f.size();
465        self.put_file(f);
466        result
467    }
468
469    fn read(&self, off: u64, bytes: &mut [u8]) {
470        let mut f = self.get_file();
471        f.read(off, bytes);
472        self.put_file(f);
473    }
474
475    fn write(&mut self, off: u64, bytes: &[u8]) {
476        let mut f = self.get_file();
477        f.write(off, bytes);
478        self.put_file(f);
479    }
480
481    fn commit(&mut self, size: u64) {
482        let mut f = self.get_file();
483        f.commit(size);
484        self.put_file(f);
485    }
486}
487
488/// Dummy Stg that can be used for Atomic upd file if "reliable" atomic commits are not required.
489pub struct DummyFile {}
490impl DummyFile {
491    /// Construct.
492    pub fn new() -> Box<Self> {
493        Box::new(Self {})
494    }
495}
496
497impl Storage for DummyFile {
498    fn clone(&self) -> Box<dyn Storage> {
499        Self::new()
500    }
501}
502
503impl BasicStorage for DummyFile {
504    fn size(&self) -> u64 {
505        0
506    }
507
508    fn read(&self, _off: u64, _bytes: &mut [u8]) {}
509
510    fn write(&mut self, _off: u64, _bytes: &[u8]) {}
511
512    fn commit(&mut self, _size: u64) {}
513}
514
515/// Memory configuration limits for [`AtomicFile`].
516#[non_exhaustive]
517pub struct Limits {
518    /// Limit on size of commit write map, default is 5000.
519    pub map_lim: usize,
520    /// Memory for buffering small reads, default is 0x200000 ( 2MB ).
521    pub rbuf_mem: usize,
522    /// Memory for buffering writes to main storage, default is 0x100000 (1MB).
523    pub swbuf: usize,
524    /// Memory for buffering writes to temporary storage, default is 0x100000 (1MB).
525    pub uwbuf: usize,
526}
527
528impl Default for Limits {
529    fn default() -> Self {
530        Self {
531            map_lim: 5000,
532            rbuf_mem: 0x200000,
533            swbuf: 0x100000,
534            uwbuf: 0x100000,
535        }
536    }
537}
538
539/// Write Buffer.
540struct WriteBuffer {
541    /// Current write index into buf.
542    ix: usize,
543    /// Current file position.
544    pos: u64,
545    /// Underlying storage.
546    pub stg: Box<dyn BasicStorage>,
547    /// Buffer.
548    buf: Vec<u8>,
549}
550
551impl WriteBuffer {
552    /// Construct.
553    pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
554        Self {
555            ix: 0,
556            pos: u64::MAX,
557            stg,
558            buf: vec![0; buf_size],
559        }
560    }
561
562    /// Write data to specified offset,
563    pub fn write(&mut self, off: u64, data: &[u8]) {
564        if self.pos + self.ix as u64 != off {
565            self.flush(off);
566        }
567        let mut done: usize = 0;
568        let mut todo: usize = data.len();
569        while todo > 0 {
570            let mut n: usize = self.buf.len() - self.ix;
571            if n == 0 {
572                self.flush(off + done as u64);
573                n = self.buf.len();
574            }
575            if n > todo {
576                n = todo;
577            }
578            self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
579            todo -= n;
580            done += n;
581            self.ix += n;
582        }
583    }
584
585    fn flush(&mut self, new_pos: u64) {
586        if self.ix > 0 {
587            self.stg.write(self.pos, &self.buf[0..self.ix]);
588        }
589        self.ix = 0;
590        self.pos = new_pos;
591    }
592
593    /// Commit.
594    pub fn commit(&mut self, size: u64) {
595        self.flush(u64::MAX);
596        self.stg.commit(size);
597    }
598
599    /// Write u64.
600    pub fn write_u64(&mut self, start: u64, value: u64) {
601        self.write(start, &value.to_le_bytes());
602    }
603}
604
605/// ReadBufStg buffers small (up to limit) reads to the underlying storage using multiple buffers. Only supported functions are read and reset.
606///
607/// See implementation of AtomicFile for how this is used in conjunction with WMap.
608///
609/// N is buffer size.
610struct ReadBufStg<const N: usize> {
611    /// Underlying storage.
612    stg: Box<dyn Storage>,
613    /// Buffers.
614    buf: Mutex<ReadBuffer<N>>,
615    /// Read size that is considered small.
616    limit: usize,
617}
618
619impl<const N: usize> Drop for ReadBufStg<N> {
620    fn drop(&mut self) {
621        self.reset();
622    }
623}
624
625impl<const N: usize> ReadBufStg<N> {
626    /// limit is the size of a read that is considered "small", max_buf is the maximum number of buffers used.
627    pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
628        Self {
629            stg,
630            buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
631            limit,
632        }
633    }
634
635    /// Clears the buffers.
636    fn reset(&mut self) {
637        self.buf.lock().unwrap().reset();
638    }
639}
640
641impl<const N: usize> BasicStorage for ReadBufStg<N> {
642    /// Read data from storage.
643    fn read(&self, start: u64, data: &mut [u8]) {
644        if data.len() <= self.limit {
645            self.buf.lock().unwrap().read(&*self.stg, start, data);
646        } else {
647            self.stg.read(start, data);
648        }
649    }
650
651    /// Panics.
652    fn size(&self) -> u64 {
653        panic!()
654    }
655
656    /// Panics.
657    fn write(&mut self, _start: u64, _data: &[u8]) {
658        panic!();
659    }
660
661    /// Panics.
662    fn commit(&mut self, _size: u64) {
663        panic!();
664    }
665}
666
667struct ReadBuffer<const N: usize> {
668    /// Maps sector mumbers cached buffers.
669    map: HashMap<u64, Box<[u8; N]>>,
670    /// Maximum number of buffers.
671    max_buf: usize,
672}
673
674impl<const N: usize> ReadBuffer<N> {
675    fn new(max_buf: usize) -> Self {
676        Self {
677            map: HashMap::default(),
678            max_buf,
679        }
680    }
681
682    fn reset(&mut self) {
683        self.map.clear();
684    }
685
686    fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
687        let mut done = 0;
688        while done < data.len() {
689            let off = off + done as u64;
690            let sector = off / N as u64;
691            let disp = (off % N as u64) as usize;
692            let amount = min(data.len() - done, N - disp);
693
694            let p = self.map.entry(sector).or_insert_with(|| {
695                let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
696                stg.read(sector * N as u64, &mut *p);
697                p
698            });
699            data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
700            done += amount;
701        }
702        if self.map.len() >= self.max_buf {
703            self.reset();
704        }
705    }
706}
707
708#[derive(Default)]
709/// Slice of Data to be written to storage.
710struct DataSlice {
711    /// Slice data.
712    pub data: Data,
713    /// Start of slice.
714    pub off: usize,
715    /// Length of slice.
716    pub len: usize,
717}
718
719impl DataSlice {
720    /// Get reference to the whole slice.
721    pub fn all(&self) -> &[u8] {
722        &self.data[self.off..self.off + self.len]
723    }
724    /// Get reference to part of slice.
725    pub fn part(&self, off: usize, len: usize) -> &[u8] {
726        &self.data[self.off + off..self.off + off + len]
727    }
728    /// Trim specified amount from start of slice.
729    pub fn trim(&mut self, trim: usize) {
730        self.off += trim;
731        self.len -= trim;
732    }
733    /// Take the data.
734    #[allow(dead_code)]
735    pub fn take(&mut self) -> Data {
736        std::mem::take(&mut self.data)
737    }
738}
739
740#[derive(Default)]
741/// Updateable store based on some underlying storage.
742struct WMap {
743    /// Map of writes. Key is the end of the slice.
744    map: BTreeMap<u64, DataSlice>,
745}
746
747impl WMap {
748    /// Is the map empty?
749    pub fn is_empty(&self) -> bool {
750        self.map.is_empty()
751    }
752
753    /// Number of key-value pairs in the map.
754    pub fn len(&self) -> usize {
755        self.map.len()
756    }
757
758    /// Take the map and convert it to a Vec.
759    pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
760        let map = std::mem::take(&mut self.map);
761        let mut result = Vec::with_capacity(map.len());
762        for (end, v) in map {
763            let start = end - v.len as u64;
764            result.push((start, v));
765        }
766        result
767    }
768
769    /// Write the map into storage.
770    pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
771        for (end, v) in self.map.iter() {
772            let start = end - v.len as u64;
773            stg.write_data(start, v.data.clone(), v.off, v.len);
774        }
775    }
776
777    #[cfg(not(feature = "pstd"))]
778    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
779    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
780        if len != 0 {
781            let (mut insert, mut remove) = (Vec::new(), Vec::new());
782            let end = start + len as u64;
783            for (ee, v) in self.map.range_mut(start + 1..) {
784                let ee = *ee;
785                let es = ee - v.len as u64; // Existing write Start.
786                if es >= end {
787                    // Existing write starts after end of new write, nothing to do.
788                    break;
789                } else if start <= es {
790                    if end < ee {
791                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
792                        v.trim((end - es) as usize);
793                        break;
794                    }
795                    // New write subsumes existing write entirely, remove existing write.
796                    remove.push(ee);
797                } else if end < ee {
798                    // New write starts in middle of existing write, ends before end of existing write,
799                    // put start of existing write in insert list, trim existing write.
800                    insert.push((es, v.data.clone(), v.off, (start - es) as usize));
801                    v.trim((end - es) as usize);
802                    break;
803                } else {
804                    // New write starts in middle of existing write, ends after existing write,
805                    // put start of existing write in insert list, remove existing write.
806                    insert.push((es, v.take(), v.off, (start - es) as usize));
807                    remove.push(ee);
808                }
809            }
810            for end in remove {
811                self.map.remove(&end);
812            }
813            for (start, data, off, len) in insert {
814                self.map
815                    .insert(start + len as u64, DataSlice { data, off, len });
816            }
817            self.map
818                .insert(start + len as u64, DataSlice { data, off, len });
819        }
820    }
821
822    #[cfg(feature = "pstd")]
823    /// Write to storage, existing writes which overlap with new write need to be trimmed or removed.
824    pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
825        if len != 0 {
826            let end = start + len as u64;
827            let mut c = self
828                .map
829                .lower_bound_mut(std::ops::Bound::Excluded(&start))
830                .with_mutable_key();
831            while let Some((eend, v)) = c.next() {
832                let ee = *eend;
833                let es = ee - v.len as u64; // Existing write Start.
834                if es >= end {
835                    // Existing write starts after end of new write, nothing to do.
836                    c.prev();
837                    break;
838                } else if start <= es {
839                    if end < ee {
840                        // New write starts before existing write, but doesn't subsume it. Trim existing write.
841                        v.trim((end - es) as usize);
842                        c.prev();
843                        break;
844                    }
845                    // New write subsumes existing write entirely, remove existing write.
846                    c.remove_prev();
847                } else if end < ee {
848                    // New write starts in middle of existing write, ends before end of existing write,
849                    // trim existing write, insert start of existing write.
850                    let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
851                    v.trim((end - es) as usize);
852                    c.prev();
853                    c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
854                    break;
855                } else {
856                    // New write starts in middle of existing write, ends after existing write,
857                    // Trim existing write ( modifies key, but this is ok as ordering is not affected ).
858                    v.len = (start - es) as usize;
859                    *eend = es + v.len as u64;
860                }
861            }
862            // Insert the new write.
863            c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
864        }
865    }
866
867    /// Read from storage, taking map of existing writes into account. Unwritten ranges are read from underlying storage.
868    pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
869        let len = data.len();
870        if len != 0 {
871            let mut done = 0;
872            for (&end, v) in self.map.range(start + 1..) {
873                let es = end - v.len as u64; // Existing write Start.
874                let doff = start + done as u64;
875                if es > doff {
876                    // Read from underlying storage.
877                    let a = min(len - done, (es - doff) as usize);
878                    u.read(doff, &mut data[done..done + a]);
879                    done += a;
880                    if done == len {
881                        return;
882                    }
883                }
884                // Use existing write.
885                let skip = (start + done as u64 - es) as usize;
886                let a = min(len - done, v.len - skip);
887                data[done..done + a].copy_from_slice(v.part(skip, a));
888                done += a;
889                if done == len {
890                    return;
891                }
892            }
893            u.read(start + done as u64, &mut data[done..]);
894        }
895    }
896}
897
898/// Basis for [crate::AtomicFile] ( non-async alternative ). Provides two-phase commit and buffering of writes.
899pub struct BasicAtomicFile {
900    /// The main underlying storage.
901    stg: WriteBuffer,
902    /// Temporary storage for updates during commit.
903    upd: WriteBuffer,
904    /// Map of writes.
905    map: WMap,
906    /// List of writes.
907    list: Vec<(u64, DataSlice)>,
908    size: u64,
909}
910
911impl BasicAtomicFile {
912    /// stg is the main underlying storage, upd is temporary storage for updates during commit.
913    pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
914        let size = stg.size();
915        let mut result = Box::new(Self {
916            stg: WriteBuffer::new(stg, lim.swbuf),
917            upd: WriteBuffer::new(upd, lim.uwbuf),
918            map: WMap::default(),
919            list: Vec::new(),
920            size,
921        });
922        result.init();
923        result
924    }
925
926    /// Apply outstanding updates.
927    fn init(&mut self) {
928        let end = self.upd.stg.read_u64(0);
929        let size = self.upd.stg.read_u64(8);
930        if end == 0 {
931            return;
932        }
933        assert!(end == self.upd.stg.size());
934        let mut pos = 16;
935        while pos < end {
936            let start = self.upd.stg.read_u64(pos);
937            pos += 8;
938            let len = self.upd.stg.read_u64(pos);
939            pos += 8;
940            let mut buf = vec![0; len as usize];
941            self.upd.stg.read(pos, &mut buf);
942            pos += len;
943            self.stg.write(start, &buf);
944        }
945        self.stg.commit(size);
946        self.upd.commit(0);
947    }
948
949    /// Perform the specified phase ( 1 or 2 ) of a two-phase commit.
950    pub fn commit_phase(&mut self, size: u64, phase: u8) {
951        if self.map.is_empty() && self.list.is_empty() {
952            return;
953        }
954        if phase == 1 {
955            self.list = self.map.convert_to_vec();
956
957            // Write the updates to upd.
958            // First set the end position to zero.
959            self.upd.write_u64(0, 0);
960            self.upd.write_u64(8, size);
961            self.upd.commit(16); // Not clear if this is necessary.
962
963            // Write the update records.
964            let mut stg_written = false;
965            let mut pos: u64 = 16;
966            for (start, v) in self.list.iter() {
967                let (start, len, data) = (*start, v.len as u64, v.all());
968                if start >= self.size {
969                    // Writes beyond current stg size can be written directly.
970                    stg_written = true;
971                    self.stg.write(start, data);
972                } else {
973                    self.upd.write_u64(pos, start);
974                    pos += 8;
975                    self.upd.write_u64(pos, len);
976                    pos += 8;
977                    self.upd.write(pos, data);
978                    pos += len;
979                }
980            }
981            if stg_written {
982                self.stg.commit(size);
983            }
984            self.upd.commit(pos); // Not clear if this is necessary.
985
986            // Set the end position.
987            self.upd.write_u64(0, pos);
988            self.upd.write_u64(8, size);
989            self.upd.commit(pos);
990        } else {
991            for (start, v) in self.list.iter() {
992                if *start < self.size {
993                    // Writes beyond current stg size have already been written.
994                    self.stg.write(*start, v.all());
995                }
996            }
997            self.list.clear();
998            self.stg.commit(size);
999            self.upd.commit(0);
1000        }
1001    }
1002}
1003
1004impl BasicStorage for BasicAtomicFile {
1005    fn commit(&mut self, size: u64) {
1006        self.commit_phase(size, 1);
1007        self.commit_phase(size, 2);
1008        self.size = size;
1009    }
1010
1011    fn size(&self) -> u64 {
1012        self.size
1013    }
1014
1015    fn read(&self, start: u64, data: &mut [u8]) {
1016        self.map.read(start, data, &*self.stg.stg);
1017    }
1018
1019    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1020        self.map.write(start, data, off, len);
1021    }
1022
1023    fn write(&mut self, start: u64, data: &[u8]) {
1024        let len = data.len();
1025        let d = Arc::new(data.to_vec());
1026        self.write_data(start, d, 0, len);
1027    }
1028}
1029
1030/// Optimized implementation of [Storage] ( unix only ).
1031#[cfg(target_family = "unix")]
1032pub struct UnixFileStorage {
1033    size: Arc<Mutex<u64>>,
1034    f: fs::File,
1035}
1036#[cfg(target_family = "unix")]
1037impl UnixFileStorage {
1038    /// Construct from filename.
1039    pub fn new(filename: &str) -> Box<Self> {
1040        let mut f = OpenOptions::new()
1041            .read(true)
1042            .write(true)
1043            .create(true)
1044            .truncate(false)
1045            .open(filename)
1046            .unwrap();
1047        let size = f.seek(SeekFrom::End(0)).unwrap();
1048        let size = Arc::new(Mutex::new(size));
1049        Box::new(Self { size, f })
1050    }
1051}
1052
1053#[cfg(target_family = "unix")]
1054impl Storage for UnixFileStorage {
1055    fn clone(&self) -> Box<dyn Storage> {
1056        Box::new(Self {
1057            size: self.size.clone(),
1058            f: self.f.try_clone().unwrap(),
1059        })
1060    }
1061}
1062
1063#[cfg(target_family = "unix")]
1064use std::os::unix::fs::FileExt;
1065
1066#[cfg(target_family = "unix")]
1067impl BasicStorage for UnixFileStorage {
1068    fn read(&self, start: u64, data: &mut [u8]) {
1069        let _ = self.f.read_at(data, start);
1070    }
1071
1072    fn write(&mut self, start: u64, data: &[u8]) {
1073        let _ = self.f.write_at(data, start);
1074    }
1075
1076    fn size(&self) -> u64 {
1077        *self.size.lock().unwrap()
1078    }
1079
1080    fn commit(&mut self, size: u64) {
1081        *self.size.lock().unwrap() = size;
1082        self.f.set_len(size).unwrap();
1083        self.f.sync_all().unwrap();
1084    }
1085}
1086
1087/// Optimized implementation of [Storage] ( windows only ).
1088#[cfg(target_family = "windows")]
1089pub struct WindowsFileStorage {
1090    size: Arc<Mutex<u64>>,
1091    f: fs::File,
1092}
1093#[cfg(target_family = "windows")]
1094impl WindowsFileStorage {
1095    /// Construct from filename.
1096    pub fn new(filename: &str) -> Box<Self> {
1097        let mut f = OpenOptions::new()
1098            .read(true)
1099            .write(true)
1100            .create(true)
1101            .truncate(false)
1102            .open(filename)
1103            .unwrap();
1104        let size = f.seek(SeekFrom::End(0)).unwrap();
1105        let size = Arc::new(Mutex::new(size));
1106        Box::new(Self { size, f })
1107    }
1108}
1109
1110#[cfg(target_family = "windows")]
1111impl Storage for WindowsFileStorage {
1112    fn clone(&self) -> Box<dyn Storage> {
1113        Box::new(Self {
1114            size: self.size.clone(),
1115            f: self.f.try_clone().unwrap(),
1116        })
1117    }
1118}
1119
1120#[cfg(target_family = "windows")]
1121use std::os::windows::fs::FileExt;
1122
1123#[cfg(target_family = "windows")]
1124impl BasicStorage for WindowsFileStorage {
1125    fn read(&self, start: u64, data: &mut [u8]) {
1126        let _ = self.f.seek_read(data, start);
1127    }
1128
1129    fn write(&mut self, start: u64, data: &[u8]) {
1130        let _ = self.f.seek_write(data, start);
1131    }
1132
1133    fn size(&self) -> u64 {
1134        *self.size.lock().unwrap()
1135    }
1136
1137    fn commit(&mut self, size: u64) {
1138        *self.size.lock().unwrap() = size;
1139        self.f.set_len(size).unwrap();
1140        self.f.sync_all().unwrap();
1141    }
1142}
1143
1144/// Optimised Storage ( varies according to platform ).
1145#[cfg(target_family = "windows")]
1146pub type MultiFileStorage = WindowsFileStorage;
1147
1148/// Optimised Storage ( varies according to platform ).
1149#[cfg(target_family = "unix")]
1150pub type MultiFileStorage = UnixFileStorage;
1151
1152/// Optimised Storage ( varies according to platform ).
1153#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1154pub type MultiFileStorage = AnyFileStorage;
1155
1156/// Fast Storage for upd file ( varies according to platform ).
1157#[cfg(any(target_family = "windows", target_family = "unix"))]
1158pub type FastFileStorage = MultiFileStorage;
1159
1160/// Fast Storage for upd file ( varies according to platform ).
1161#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1162pub type FastFileStorage = UpdFileStorage;
1163
1164#[cfg(test)]
1165/// Get amount of testing from environment variable TA.
1166fn test_amount() -> usize {
1167    str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1168}
1169
1170#[test]
1171fn test_atomic_file() {
1172    use rand::Rng;
1173    /* Idea of test is to check AtomicFile and MemFile behave the same */
1174
1175    let ta = test_amount();
1176    println!(" Test amount={}", ta);
1177
1178    let mut rng = rand::thread_rng();
1179
1180    for _ in 0..100 {
1181        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1182        // let mut s1 = BasicAtomicFile::new(MemFile::new(), MemFile::new(), &Limits::default() );
1183        let mut s2 = MemFile::new();
1184
1185        for _ in 0..1000 * ta {
1186            let off: usize = rng.r#gen::<usize>() % 100;
1187            let mut len = 1 + rng.r#gen::<usize>() % 20;
1188            let w: bool = rng.r#gen();
1189            if w {
1190                let mut bytes = Vec::new();
1191                while len > 0 {
1192                    len -= 1;
1193                    let b: u8 = rng.r#gen::<u8>();
1194                    bytes.push(b);
1195                }
1196                s1.write(off as u64, &bytes);
1197                s2.write(off as u64, &bytes);
1198            } else {
1199                let mut b2 = vec![0; len];
1200                let mut b3 = vec![0; len];
1201                s1.read(off as u64, &mut b2);
1202                s2.read(off as u64, &mut b3);
1203                assert!(b2 == b3);
1204            }
1205            if rng.r#gen::<usize>() % 50 == 0 {
1206                s1.commit(200);
1207                s2.commit(200);
1208            }
1209        }
1210    }
1211}