1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::GTemp,
27 veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39pub type Data = Arc<Vec<u8>>;
41
42pub struct AtomicFile {
59 map: WMap,
61 cf: Arc<RwLock<CommitFile>>,
63 size: u64,
65 tx: std::sync::mpsc::Sender<(u64, WMap)>,
67 busy: Arc<Mutex<()>>,
69 map_lim: usize,
71}
72
73impl AtomicFile {
74 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76 Self::new_with_limits(stg, upd, &Limits::default())
77 }
78
79 pub fn new_with_limits(
81 stg: Box<dyn Storage>,
82 upd: Box<dyn BasicStorage>,
83 lim: &Limits,
84 ) -> Box<Self> {
85 let size = stg.size();
86 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
94
95 std::thread::spawn(move || {
96 while let Ok((size, map)) = rx.recv() {
98 let _lock = busy1.lock();
99 baf.map = map;
100 baf.commit(size);
101 cf1.write().unwrap().done_one();
102 }
103 });
104 Box::new(Self {
105 map: WMap::default(),
106 cf,
107 size,
108 tx,
109 busy,
110 map_lim: lim.map_lim,
111 })
112 }
113}
114
115impl Storage for AtomicFile {
116 fn clone(&self) -> Box<dyn Storage> {
117 panic!()
118 }
119}
120
121impl BasicStorage for AtomicFile {
122 fn commit(&mut self, size: u64) {
123 self.size = size;
124 if self.map.is_empty() {
125 return;
126 }
127 if self.cf.read().unwrap().map.len() > self.map_lim {
128 self.wait_complete();
129 }
130 let map = std::mem::take(&mut self.map);
131 let cf = &mut *self.cf.write().unwrap();
132 cf.todo += 1;
133 map.to_storage(cf);
135 self.tx.send((size, map)).unwrap();
137 }
138
139 fn size(&self) -> u64 {
140 self.size
141 }
142
143 fn read(&self, start: u64, data: &mut [u8]) {
144 self.map.read(start, data, &*self.cf.read().unwrap());
145 }
146
147 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
148 self.map.write(start, data, off, len);
149 }
150
151 fn write(&mut self, start: u64, data: &[u8]) {
152 let len = data.len();
153 let d = Arc::new(data.to_vec());
154 self.write_data(start, d, 0, len);
155 }
156
157 fn wait_complete(&self) {
158 while self.cf.read().unwrap().todo != 0 {
159 let _x = self.busy.lock();
160 }
161 }
162}
163
164struct CommitFile {
165 stg: ReadBufStg<256>,
167 map: WMap,
169 todo: usize,
171}
172
173impl CommitFile {
174 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
175 Self {
176 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
177 map: WMap::default(),
178 todo: 0,
179 }
180 }
181
182 fn done_one(&mut self) {
183 self.todo -= 1;
184 if self.todo == 0 {
185 self.map = WMap::default();
186 self.stg.reset();
187 }
188 }
189}
190
191impl BasicStorage for CommitFile {
192 fn commit(&mut self, _size: u64) {
193 panic!()
194 }
195
196 fn size(&self) -> u64 {
197 panic!()
198 }
199
200 fn read(&self, start: u64, data: &mut [u8]) {
201 self.map.read(start, data, &self.stg);
202 }
203
204 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
205 self.map.write(start, data, off, len);
206 }
207
208 fn write(&mut self, _start: u64, _data: &[u8]) {
209 panic!()
210 }
211}
212
213pub trait BasicStorage: Send {
217 fn size(&self) -> u64;
220
221 fn read(&self, start: u64, data: &mut [u8]);
223
224 fn write(&mut self, start: u64, data: &[u8]);
226
227 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
229 let len = data.len();
230 let d = Arc::new(data);
231 self.write_data(start, d, 0, len);
232 }
233
234 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
236 self.write(start, &data[off..off + len]);
237 }
238
239 fn commit(&mut self, size: u64);
241
242 fn write_u64(&mut self, start: u64, value: u64) {
244 self.write(start, &value.to_le_bytes());
245 }
246
247 fn read_u64(&self, start: u64) -> u64 {
249 let mut bytes = [0; 8];
250 self.read(start, &mut bytes);
251 u64::from_le_bytes(bytes)
252 }
253
254 fn wait_complete(&self) {}
256}
257
258pub trait Storage: BasicStorage + Sync {
260 fn clone(&self) -> Box<dyn Storage>;
262}
263
264#[derive(Default)]
266pub struct MemFile {
267 v: Arc<Mutex<Vec<u8>>>,
268}
269
270impl MemFile {
271 pub fn new() -> Box<Self> {
273 Box::default()
274 }
275}
276
277impl Storage for MemFile {
278 fn clone(&self) -> Box<dyn Storage> {
279 Box::new(Self { v: self.v.clone() })
280 }
281}
282
283impl BasicStorage for MemFile {
284 fn size(&self) -> u64 {
285 let v = self.v.lock().unwrap();
286 v.len() as u64
287 }
288
289 fn read(&self, off: u64, bytes: &mut [u8]) {
290 let off = off as usize;
291 let len = bytes.len();
292 let mut v = self.v.lock().unwrap();
293 if off + len > v.len() {
294 v.resize(off + len, 0);
295 }
296 bytes.copy_from_slice(&v[off..off + len]);
297 }
298
299 fn write(&mut self, off: u64, bytes: &[u8]) {
300 let off = off as usize;
301 let len = bytes.len();
302 let mut v = self.v.lock().unwrap();
303 if off + len > v.len() {
304 v.resize(off + len, 0);
305 }
306 v[off..off + len].copy_from_slice(bytes);
307 }
308
309 fn commit(&mut self, size: u64) {
310 let mut v = self.v.lock().unwrap();
311 v.resize(size as usize, 0);
312 }
313}
314
315use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
316
317struct FileInner {
318 f: fs::File,
319}
320
321impl FileInner {
322 pub fn new(filename: &str) -> Self {
324 Self {
325 f: OpenOptions::new()
326 .read(true)
327 .write(true)
328 .create(true)
329 .truncate(false)
330 .open(filename)
331 .unwrap(),
332 }
333 }
334
335 fn size(&mut self) -> u64 {
336 self.f.seek(SeekFrom::End(0)).unwrap()
337 }
338
339 fn read(&mut self, off: u64, bytes: &mut [u8]) {
340 self.f.seek(SeekFrom::Start(off)).unwrap();
341 let _ = self.f.read(bytes).unwrap();
342 }
343
344 fn write(&mut self, off: u64, bytes: &[u8]) {
345 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
347 {
348 let size = self.f.seek(SeekFrom::End(0)).unwrap();
349 if off > size {
350 self.f.set_len(off).unwrap();
351 }
352 }
353 self.f.seek(SeekFrom::Start(off)).unwrap();
354 let _ = self.f.write(bytes).unwrap();
355 }
356
357 fn commit(&mut self, size: u64) {
358 self.f.set_len(size).unwrap();
359 self.f.sync_all().unwrap();
360 }
361}
362
363pub struct UpdFileStorage {
365 file: Cell<Option<FileInner>>,
366}
367
368impl UpdFileStorage {
369 pub fn new(filename: &str) -> Box<Self> {
371 Box::new(Self {
372 file: Cell::new(Some(FileInner::new(filename))),
373 })
374 }
375}
376
377impl BasicStorage for UpdFileStorage {
378 fn size(&self) -> u64 {
379 let mut f = self.file.take().unwrap();
380 let result = f.size();
381 self.file.set(Some(f));
382 result
383 }
384 fn read(&self, off: u64, bytes: &mut [u8]) {
385 let mut f = self.file.take().unwrap();
386 f.read(off, bytes);
387 self.file.set(Some(f));
388 }
389
390 fn write(&mut self, off: u64, bytes: &[u8]) {
391 let mut f = self.file.take().unwrap();
392 f.write(off, bytes);
393 self.file.set(Some(f));
394 }
395
396 fn commit(&mut self, size: u64) {
397 let mut f = self.file.take().unwrap();
398 f.commit(size);
399 self.file.set(Some(f));
400 }
401}
402
403pub struct SimpleFileStorage {
405 file: Arc<Mutex<FileInner>>,
406}
407
408impl SimpleFileStorage {
409 pub fn new(filename: &str) -> Box<Self> {
411 Box::new(Self {
412 file: Arc::new(Mutex::new(FileInner::new(filename))),
413 })
414 }
415}
416
417impl Storage for SimpleFileStorage {
418 fn clone(&self) -> Box<dyn Storage> {
419 Box::new(Self {
420 file: self.file.clone(),
421 })
422 }
423}
424
425impl BasicStorage for SimpleFileStorage {
426 fn size(&self) -> u64 {
427 self.file.lock().unwrap().size()
428 }
429
430 fn read(&self, off: u64, bytes: &mut [u8]) {
431 self.file.lock().unwrap().read(off, bytes);
432 }
433
434 fn write(&mut self, off: u64, bytes: &[u8]) {
435 self.file.lock().unwrap().write(off, bytes);
436 }
437
438 fn commit(&mut self, size: u64) {
439 self.file.lock().unwrap().commit(size);
440 }
441}
442
443pub struct AnyFileStorage {
445 filename: String,
446 files: Arc<Mutex<Vec<FileInner>>>,
447}
448
449impl AnyFileStorage {
450 pub fn new(filename: &str) -> Box<Self> {
452 Box::new(Self {
453 filename: filename.to_owned(),
454 files: Arc::new(Mutex::new(Vec::new())),
455 })
456 }
457
458 fn get_file(&self) -> FileInner {
459 match self.files.lock().unwrap().pop() {
460 Some(f) => f,
461 _ => FileInner::new(&self.filename),
462 }
463 }
464
465 fn put_file(&self, f: FileInner) {
466 self.files.lock().unwrap().push(f);
467 }
468}
469
470impl Storage for AnyFileStorage {
471 fn clone(&self) -> Box<dyn Storage> {
472 Box::new(Self {
473 filename: self.filename.clone(),
474 files: self.files.clone(),
475 })
476 }
477}
478
479impl BasicStorage for AnyFileStorage {
480 fn size(&self) -> u64 {
481 let mut f = self.get_file();
482 let result = f.size();
483 self.put_file(f);
484 result
485 }
486
487 fn read(&self, off: u64, bytes: &mut [u8]) {
488 let mut f = self.get_file();
489 f.read(off, bytes);
490 self.put_file(f);
491 }
492
493 fn write(&mut self, off: u64, bytes: &[u8]) {
494 let mut f = self.get_file();
495 f.write(off, bytes);
496 self.put_file(f);
497 }
498
499 fn commit(&mut self, size: u64) {
500 let mut f = self.get_file();
501 f.commit(size);
502 self.put_file(f);
503 }
504}
505
506pub struct DummyFile {}
508impl DummyFile {
509 pub fn new() -> Box<Self> {
511 Box::new(Self {})
512 }
513}
514
515impl Storage for DummyFile {
516 fn clone(&self) -> Box<dyn Storage> {
517 Self::new()
518 }
519}
520
521impl BasicStorage for DummyFile {
522 fn size(&self) -> u64 {
523 0
524 }
525
526 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
527
528 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
529
530 fn commit(&mut self, _size: u64) {}
531}
532
533#[non_exhaustive]
535pub struct Limits {
536 pub map_lim: usize,
538 pub rbuf_mem: usize,
540 pub swbuf: usize,
542 pub uwbuf: usize,
544}
545
546impl Default for Limits {
547 fn default() -> Self {
548 Self {
549 map_lim: 5000,
550 rbuf_mem: 0x200000,
551 swbuf: 0x100000,
552 uwbuf: 0x100000,
553 }
554 }
555}
556
557struct WriteBuffer {
559 ix: usize,
561 pos: u64,
563 pub stg: Box<dyn BasicStorage>,
565 buf: Vec<u8>,
567}
568
569impl WriteBuffer {
570 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
572 Self {
573 ix: 0,
574 pos: u64::MAX,
575 stg,
576 buf: vec![0; buf_size],
577 }
578 }
579
580 pub fn write(&mut self, off: u64, data: &[u8]) {
582 if self.pos + self.ix as u64 != off {
583 self.flush(off);
584 }
585 let mut done: usize = 0;
586 let mut todo: usize = data.len();
587 while todo > 0 {
588 let mut n: usize = self.buf.len() - self.ix;
589 if n == 0 {
590 self.flush(off + done as u64);
591 n = self.buf.len();
592 }
593 if n > todo {
594 n = todo;
595 }
596 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
597 todo -= n;
598 done += n;
599 self.ix += n;
600 }
601 }
602
603 fn flush(&mut self, new_pos: u64) {
604 if self.ix > 0 {
605 self.stg.write(self.pos, &self.buf[0..self.ix]);
606 }
607 self.ix = 0;
608 self.pos = new_pos;
609 }
610
611 pub fn commit(&mut self, size: u64) {
613 self.flush(u64::MAX);
614 self.stg.commit(size);
615 }
616
617 pub fn write_u64(&mut self, start: u64, value: u64) {
619 self.write(start, &value.to_le_bytes());
620 }
621}
622
623struct ReadBufStg<const N: usize> {
629 stg: Box<dyn Storage>,
631 buf: Mutex<ReadBuffer<N>>,
633 limit: usize,
635}
636
637impl<const N: usize> Drop for ReadBufStg<N> {
638 fn drop(&mut self) {
639 self.reset();
640 }
641}
642
643impl<const N: usize> ReadBufStg<N> {
644 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
646 Self {
647 stg,
648 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
649 limit,
650 }
651 }
652
653 fn reset(&mut self) {
655 self.buf.lock().unwrap().reset();
656 }
657}
658
659impl<const N: usize> BasicStorage for ReadBufStg<N> {
660 fn read(&self, start: u64, data: &mut [u8]) {
662 if data.len() <= self.limit {
663 self.buf.lock().unwrap().read(&*self.stg, start, data);
664 } else {
665 self.stg.read(start, data);
666 }
667 }
668
669 fn size(&self) -> u64 {
671 panic!()
672 }
673
674 fn write(&mut self, _start: u64, _data: &[u8]) {
676 panic!();
677 }
678
679 fn commit(&mut self, _size: u64) {
681 panic!();
682 }
683}
684
685struct ReadBuffer<const N: usize> {
686 map: HashMap<u64, Box<[u8; N]>>,
688 max_buf: usize,
690}
691
692impl<const N: usize> ReadBuffer<N> {
693 fn new(max_buf: usize) -> Self {
694 Self {
695 map: HashMap::default(),
696 max_buf,
697 }
698 }
699
700 fn reset(&mut self) {
701 self.map.clear();
702 }
703
704 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
705 let mut done = 0;
706 while done < data.len() {
707 let off = off + done as u64;
708 let sector = off / N as u64;
709 let disp = (off % N as u64) as usize;
710 let amount = min(data.len() - done, N - disp);
711
712 let p = self.map.entry(sector).or_insert_with(|| {
713 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
714 stg.read(sector * N as u64, &mut *p);
715 p
716 });
717 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
718 done += amount;
719 }
720 if self.map.len() >= self.max_buf {
721 self.reset();
722 }
723 }
724}
725
726#[derive(Default)]
727struct DataSlice {
729 pub data: Data,
731 pub off: usize,
733 pub len: usize,
735}
736
737impl DataSlice {
738 pub fn all(&self) -> &[u8] {
740 &self.data[self.off..self.off + self.len]
741 }
742 pub fn part(&self, off: usize, len: usize) -> &[u8] {
744 &self.data[self.off + off..self.off + off + len]
745 }
746 pub fn trim(&mut self, trim: usize) {
748 self.off += trim;
749 self.len -= trim;
750 }
751 #[allow(dead_code)]
753 pub fn take(&mut self) -> Data {
754 std::mem::take(&mut self.data)
755 }
756}
757
758#[derive(Default)]
759struct WMap {
761 map: BTreeMap<u64, DataSlice>,
763}
764
765impl WMap {
766 pub fn is_empty(&self) -> bool {
768 self.map.is_empty()
769 }
770
771 pub fn len(&self) -> usize {
773 self.map.len()
774 }
775
776 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
778 let map = std::mem::take(&mut self.map);
779 let mut result = GVec::with_capacity(map.len());
780 for (end, v) in map {
781 let start = end - v.len as u64;
782 result.push((start, v));
783 }
784 result
785 }
786
787 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
789 for (end, v) in self.map.iter() {
790 let start = end - v.len as u64;
791 stg.write_data(start, v.data.clone(), v.off, v.len);
792 }
793 }
794
795 #[cfg(not(feature = "pstd"))]
796 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
798 if len != 0 {
799 let (mut insert, mut remove) = (Vec::new(), Vec::new());
800 let end = start + len as u64;
801 for (ee, v) in self.map.range_mut(start + 1..) {
802 let ee = *ee;
803 let es = ee - v.len as u64; if es >= end {
805 break;
807 } else if start <= es {
808 if end < ee {
809 v.trim((end - es) as usize);
811 break;
812 }
813 remove.push(ee);
815 } else if end < ee {
816 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
819 v.trim((end - es) as usize);
820 break;
821 } else {
822 insert.push((es, v.take(), v.off, (start - es) as usize));
825 remove.push(ee);
826 }
827 }
828 for end in remove {
829 self.map.remove(&end);
830 }
831 for (start, data, off, len) in insert {
832 self.map
833 .insert(start + len as u64, DataSlice { data, off, len });
834 }
835 self.map
836 .insert(start + len as u64, DataSlice { data, off, len });
837 }
838 }
839
840 #[cfg(feature = "pstd")]
841 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
843 if len != 0 {
844 let end = start + len as u64;
845 let mut c = self
846 .map
847 .lower_bound_mut(std::ops::Bound::Excluded(&start))
848 .with_mutable_key();
849 while let Some((eend, v)) = c.next() {
850 let ee = *eend;
851 let es = ee - v.len as u64; if es >= end {
853 c.prev();
855 break;
856 } else if start <= es {
857 if end < ee {
858 v.trim((end - es) as usize);
860 c.prev();
861 break;
862 }
863 c.remove_prev();
865 } else if end < ee {
866 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
869 v.trim((end - es) as usize);
870 c.prev();
871 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
872 break;
873 } else {
874 v.len = (start - es) as usize;
877 *eend = es + v.len as u64;
878 }
879 }
880 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
882 }
883 }
884
885 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
887 let len = data.len();
888 if len != 0 {
889 let mut done = 0;
890 for (&end, v) in self.map.range(start + 1..) {
891 let es = end - v.len as u64; let doff = start + done as u64;
893 if es > doff {
894 let a = min(len - done, (es - doff) as usize);
896 u.read(doff, &mut data[done..done + a]);
897 done += a;
898 if done == len {
899 return;
900 }
901 }
902 let skip = (start + done as u64 - es) as usize;
904 let a = min(len - done, v.len - skip);
905 data[done..done + a].copy_from_slice(v.part(skip, a));
906 done += a;
907 if done == len {
908 return;
909 }
910 }
911 u.read(start + done as u64, &mut data[done..]);
912 }
913 }
914}
915
916pub struct BasicAtomicFile {
918 stg: WriteBuffer,
920 upd: WriteBuffer,
922 map: WMap,
924 list: GVec<(u64, DataSlice)>,
926 size: u64,
927}
928
929impl BasicAtomicFile {
930 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
932 let size = stg.size();
933 let mut result = Box::new(Self {
934 stg: WriteBuffer::new(stg, lim.swbuf),
935 upd: WriteBuffer::new(upd, lim.uwbuf),
936 map: WMap::default(),
937 list: GVec::new(),
938 size,
939 });
940 result.init();
941 result
942 }
943
944 fn init(&mut self) {
946 let end = self.upd.stg.read_u64(0);
947 let size = self.upd.stg.read_u64(8);
948 if end == 0 {
949 return;
950 }
951 assert!(end == self.upd.stg.size());
952 let mut pos = 16;
953 while pos < end {
954 let start = self.upd.stg.read_u64(pos);
955 pos += 8;
956 let len = self.upd.stg.read_u64(pos);
957 pos += 8;
958 let mut buf: GVec<u8> = gvec![0; len as usize];
959 self.upd.stg.read(pos, &mut buf);
960 pos += len;
961 self.stg.write(start, &buf);
962 }
963 self.stg.commit(size);
964 self.upd.commit(0);
965 }
966
967 pub fn commit_phase(&mut self, size: u64, phase: u8) {
969 if self.map.is_empty() && self.list.is_empty() {
970 return;
971 }
972 if phase == 1 {
973 self.list = self.map.convert_to_vec();
974
975 self.upd.write_u64(0, 0);
978 self.upd.write_u64(8, size);
979 self.upd.commit(16); let mut stg_written = false;
983 let mut pos: u64 = 16;
984 for (start, v) in self.list.iter() {
985 let (start, len, data) = (*start, v.len as u64, v.all());
986 if start >= self.size {
987 stg_written = true;
989 self.stg.write(start, data);
990 } else {
991 self.upd.write_u64(pos, start);
992 pos += 8;
993 self.upd.write_u64(pos, len);
994 pos += 8;
995 self.upd.write(pos, data);
996 pos += len;
997 }
998 }
999 if stg_written {
1000 self.stg.commit(size);
1001 }
1002 self.upd.commit(pos); self.upd.write_u64(0, pos);
1006 self.upd.write_u64(8, size);
1007 self.upd.commit(pos);
1008 } else {
1009 for (start, v) in self.list.iter() {
1010 if *start < self.size {
1011 self.stg.write(*start, v.all());
1013 }
1014 }
1015 self.list = GVec::new();
1016 self.stg.commit(size);
1017 self.upd.commit(0);
1018 }
1019 }
1020}
1021
1022impl BasicStorage for BasicAtomicFile {
1023 fn commit(&mut self, size: u64) {
1024 self.commit_phase(size, 1);
1025 self.commit_phase(size, 2);
1026 self.size = size;
1027 }
1028
1029 fn size(&self) -> u64 {
1030 self.size
1031 }
1032
1033 fn read(&self, start: u64, data: &mut [u8]) {
1034 self.map.read(start, data, &*self.stg.stg);
1035 }
1036
1037 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1038 self.map.write(start, data, off, len);
1039 }
1040
1041 fn write(&mut self, start: u64, data: &[u8]) {
1042 let len = data.len();
1043 let d = Arc::new(data.to_vec());
1044 self.write_data(start, d, 0, len);
1045 }
1046}
1047
1048#[cfg(target_family = "unix")]
1050pub struct UnixFileStorage {
1051 size: Arc<Mutex<u64>>,
1052 f: fs::File,
1053}
1054#[cfg(target_family = "unix")]
1055impl UnixFileStorage {
1056 pub fn new(filename: &str) -> Box<Self> {
1058 let mut f = OpenOptions::new()
1059 .read(true)
1060 .write(true)
1061 .create(true)
1062 .truncate(false)
1063 .open(filename)
1064 .unwrap();
1065 let size = f.seek(SeekFrom::End(0)).unwrap();
1066 let size = Arc::new(Mutex::new(size));
1067 Box::new(Self { size, f })
1068 }
1069}
1070
1071#[cfg(target_family = "unix")]
1072impl Storage for UnixFileStorage {
1073 fn clone(&self) -> Box<dyn Storage> {
1074 Box::new(Self {
1075 size: self.size.clone(),
1076 f: self.f.try_clone().unwrap(),
1077 })
1078 }
1079}
1080
1081#[cfg(target_family = "unix")]
1082use std::os::unix::fs::FileExt;
1083
1084#[cfg(target_family = "unix")]
1085impl BasicStorage for UnixFileStorage {
1086 fn read(&self, start: u64, data: &mut [u8]) {
1087 let _ = self.f.read_at(data, start);
1088 }
1089
1090 fn write(&mut self, start: u64, data: &[u8]) {
1091 let _ = self.f.write_at(data, start);
1092 }
1093
1094 fn size(&self) -> u64 {
1095 *self.size.lock().unwrap()
1096 }
1097
1098 fn commit(&mut self, size: u64) {
1099 *self.size.lock().unwrap() = size;
1100 self.f.set_len(size).unwrap();
1101 self.f.sync_all().unwrap();
1102 }
1103}
1104
1105#[cfg(target_family = "windows")]
1107pub struct WindowsFileStorage {
1108 size: Arc<Mutex<u64>>,
1109 f: fs::File,
1110}
1111#[cfg(target_family = "windows")]
1112impl WindowsFileStorage {
1113 pub fn new(filename: &str) -> Box<Self> {
1115 let mut f = OpenOptions::new()
1116 .read(true)
1117 .write(true)
1118 .create(true)
1119 .truncate(false)
1120 .open(filename)
1121 .unwrap();
1122 let size = f.seek(SeekFrom::End(0)).unwrap();
1123 let size = Arc::new(Mutex::new(size));
1124 Box::new(Self { size, f })
1125 }
1126}
1127
1128#[cfg(target_family = "windows")]
1129impl Storage for WindowsFileStorage {
1130 fn clone(&self) -> Box<dyn Storage> {
1131 Box::new(Self {
1132 size: self.size.clone(),
1133 f: self.f.try_clone().unwrap(),
1134 })
1135 }
1136}
1137
1138#[cfg(target_family = "windows")]
1139use std::os::windows::fs::FileExt;
1140
1141#[cfg(target_family = "windows")]
1142impl BasicStorage for WindowsFileStorage {
1143 fn read(&self, start: u64, data: &mut [u8]) {
1144 let _ = self.f.seek_read(data, start);
1145 }
1146
1147 fn write(&mut self, start: u64, data: &[u8]) {
1148 let _ = self.f.seek_write(data, start);
1149 }
1150
1151 fn size(&self) -> u64 {
1152 *self.size.lock().unwrap()
1153 }
1154
1155 fn commit(&mut self, size: u64) {
1156 *self.size.lock().unwrap() = size;
1157 self.f.set_len(size).unwrap();
1158 self.f.sync_all().unwrap();
1159 }
1160}
1161
1162#[cfg(target_family = "windows")]
1164pub type MultiFileStorage = WindowsFileStorage;
1165
1166#[cfg(target_family = "unix")]
1168pub type MultiFileStorage = UnixFileStorage;
1169
1170#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1172pub type MultiFileStorage = AnyFileStorage;
1173
1174#[cfg(any(target_family = "windows", target_family = "unix"))]
1176pub type FastFileStorage = MultiFileStorage;
1177
1178#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1180pub type FastFileStorage = UpdFileStorage;
1181
1182#[cfg(test)]
1183fn test_amount() -> usize {
1185 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1186}
1187
1188#[test]
1189fn test_atomic_file() {
1190 use rand::Rng;
1191 let ta = test_amount();
1194 println!(" Test amount={}", ta);
1195
1196 let mut rng = rand::thread_rng();
1197
1198 for _ in 0..100 {
1199 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1200 let mut s2 = MemFile::new();
1202
1203 for _ in 0..1000 * ta {
1204 let off: usize = rng.r#gen::<usize>() % 100;
1205 let mut len = 1 + rng.r#gen::<usize>() % 20;
1206 let w: bool = rng.r#gen();
1207 if w {
1208 let mut bytes = Vec::new();
1209 while len > 0 {
1210 len -= 1;
1211 let b: u8 = rng.r#gen::<u8>();
1212 bytes.push(b);
1213 }
1214 s1.write(off as u64, &bytes);
1215 s2.write(off as u64, &bytes);
1216 } else {
1217 let mut b2 = vec![0; len];
1218 let mut b3 = vec![0; len];
1219 s1.read(off as u64, &mut b2);
1220 s2.read(off as u64, &mut b3);
1221 assert!(b2 == b3);
1222 }
1223 if rng.r#gen::<usize>() % 50 == 0 {
1224 s1.commit(200);
1225 s2.commit(200);
1226 }
1227 }
1228 }
1229}