1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{VecA, collections::{BTreeMapA,btree_map::CustomTuning}, localalloc::GTemp};
24
25#[cfg(feature = "pstd")]
26type BTreeMap<K,V> = BTreeMapA<K,V,CustomTuning<GTemp>>;
27
28#[cfg(not(feature = "pstd"))]
29use std::collections::BTreeMap;
30
31#[cfg(feature = "pstd")]
32type GVec<T> = VecA<T, GTemp>;
33
34#[cfg(not(feature = "pstd"))]
35type GVec<T> = Vec<T>;
36
37pub type Data = Arc<Vec<u8>>;
39
40pub struct AtomicFile {
57 map: WMap,
59 cf: Arc<RwLock<CommitFile>>,
61 size: u64,
63 tx: std::sync::mpsc::Sender<(u64, WMap)>,
65 busy: Arc<Mutex<()>>,
67 map_lim: usize,
69}
70
71impl AtomicFile {
72 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
74 Self::new_with_limits(stg, upd, &Limits::default())
75 }
76
77 pub fn new_with_limits(
79 stg: Box<dyn Storage>,
80 upd: Box<dyn BasicStorage>,
81 lim: &Limits,
82 ) -> Box<Self> {
83 let size = stg.size();
84 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
85
86 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
87 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
88 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
92
93 std::thread::spawn(move || {
94 while let Ok((size, map)) = rx.recv() {
96 let _lock = busy1.lock();
97 baf.map = map;
98 baf.commit(size);
99 cf1.write().unwrap().done_one();
100 }
101 });
102 Box::new(Self {
103 map: WMap::default(),
104 cf,
105 size,
106 tx,
107 busy,
108 map_lim: lim.map_lim,
109 })
110 }
111}
112
113impl Storage for AtomicFile {
114 fn clone(&self) -> Box<dyn Storage> {
115 panic!()
116 }
117}
118
119impl BasicStorage for AtomicFile {
120 fn commit(&mut self, size: u64) {
121 self.size = size;
122 if self.map.is_empty() {
123 return;
124 }
125 if self.cf.read().unwrap().map.len() > self.map_lim {
126 self.wait_complete();
127 }
128 let map = std::mem::take(&mut self.map);
129 let cf = &mut *self.cf.write().unwrap();
130 cf.todo += 1;
131 map.to_storage(cf);
133 self.tx.send((size, map)).unwrap();
135 }
136
137 fn size(&self) -> u64 {
138 self.size
139 }
140
141 fn read(&self, start: u64, data: &mut [u8]) {
142 self.map.read(start, data, &*self.cf.read().unwrap());
143 }
144
145 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
146 self.map.write(start, data, off, len);
147 }
148
149 fn write(&mut self, start: u64, data: &[u8]) {
150 let len = data.len();
151 let d = Arc::new(data.to_vec());
152 self.write_data(start, d, 0, len);
153 }
154
155 fn wait_complete(&self) {
156 while self.cf.read().unwrap().todo != 0 {
157 let _x = self.busy.lock();
158 }
159 }
160}
161
162struct CommitFile {
163 stg: ReadBufStg<256>,
165 map: WMap,
167 todo: usize,
169}
170
171impl CommitFile {
172 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
173 Self {
174 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
175 map: WMap::default(),
176 todo: 0,
177 }
178 }
179
180 fn done_one(&mut self) {
181 self.todo -= 1;
182 if self.todo == 0 {
183 self.map = WMap::default();
184 self.stg.reset();
185 }
186 }
187}
188
189impl BasicStorage for CommitFile {
190 fn commit(&mut self, _size: u64) {
191 panic!()
192 }
193
194 fn size(&self) -> u64 {
195 panic!()
196 }
197
198 fn read(&self, start: u64, data: &mut [u8]) {
199 self.map.read(start, data, &self.stg);
200 }
201
202 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
203 self.map.write(start, data, off, len);
204 }
205
206 fn write(&mut self, _start: u64, _data: &[u8]) {
207 panic!()
208 }
209}
210
211pub trait BasicStorage: Send {
215 fn size(&self) -> u64;
218
219 fn read(&self, start: u64, data: &mut [u8]);
221
222 fn write(&mut self, start: u64, data: &[u8]);
224
225 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
227 let len = data.len();
228 let d = Arc::new(data);
229 self.write_data(start, d, 0, len);
230 }
231
232 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
234 self.write(start, &data[off..off + len]);
235 }
236
237 fn commit(&mut self, size: u64);
239
240 fn write_u64(&mut self, start: u64, value: u64) {
242 self.write(start, &value.to_le_bytes());
243 }
244
245 fn read_u64(&self, start: u64) -> u64 {
247 let mut bytes = [0; 8];
248 self.read(start, &mut bytes);
249 u64::from_le_bytes(bytes)
250 }
251
252 fn wait_complete(&self) {}
254}
255
256pub trait Storage: BasicStorage + Sync {
258 fn clone(&self) -> Box<dyn Storage>;
260}
261
262#[derive(Default)]
264pub struct MemFile {
265 v: Arc<Mutex<Vec<u8>>>,
266}
267
268impl MemFile {
269 pub fn new() -> Box<Self> {
271 Box::default()
272 }
273}
274
275impl Storage for MemFile {
276 fn clone(&self) -> Box<dyn Storage> {
277 Box::new(Self { v: self.v.clone() })
278 }
279}
280
281impl BasicStorage for MemFile {
282 fn size(&self) -> u64 {
283 let v = self.v.lock().unwrap();
284 v.len() as u64
285 }
286
287 fn read(&self, off: u64, bytes: &mut [u8]) {
288 let off = off as usize;
289 let len = bytes.len();
290 let mut v = self.v.lock().unwrap();
291 if off + len > v.len() {
292 v.resize(off + len, 0);
293 }
294 bytes.copy_from_slice(&v[off..off + len]);
295 }
296
297 fn write(&mut self, off: u64, bytes: &[u8]) {
298 let off = off as usize;
299 let len = bytes.len();
300 let mut v = self.v.lock().unwrap();
301 if off + len > v.len() {
302 v.resize(off + len, 0);
303 }
304 v[off..off + len].copy_from_slice(bytes);
305 }
306
307 fn commit(&mut self, size: u64) {
308 let mut v = self.v.lock().unwrap();
309 v.resize(size as usize, 0);
310 }
311}
312
313use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
314
315struct FileInner {
316 f: fs::File,
317}
318
319impl FileInner {
320 pub fn new(filename: &str) -> Self {
322 Self {
323 f: OpenOptions::new()
324 .read(true)
325 .write(true)
326 .create(true)
327 .truncate(false)
328 .open(filename)
329 .unwrap(),
330 }
331 }
332
333 fn size(&mut self) -> u64 {
334 self.f.seek(SeekFrom::End(0)).unwrap()
335 }
336
337 fn read(&mut self, off: u64, bytes: &mut [u8]) {
338 self.f.seek(SeekFrom::Start(off)).unwrap();
339 let _ = self.f.read(bytes).unwrap();
340 }
341
342 fn write(&mut self, off: u64, bytes: &[u8]) {
343 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
345 {
346 let size = self.f.seek(SeekFrom::End(0)).unwrap();
347 if off > size {
348 self.f.set_len(off).unwrap();
349 }
350 }
351 self.f.seek(SeekFrom::Start(off)).unwrap();
352 let _ = self.f.write(bytes).unwrap();
353 }
354
355 fn commit(&mut self, size: u64) {
356 self.f.set_len(size).unwrap();
357 self.f.sync_all().unwrap();
358 }
359}
360
361pub struct UpdFileStorage {
363 file: Cell<Option<FileInner>>,
364}
365
366impl UpdFileStorage {
367 pub fn new(filename: &str) -> Box<Self> {
369 Box::new(Self {
370 file: Cell::new(Some(FileInner::new(filename))),
371 })
372 }
373}
374
375impl BasicStorage for UpdFileStorage {
376 fn size(&self) -> u64 {
377 let mut f = self.file.take().unwrap();
378 let result = f.size();
379 self.file.set(Some(f));
380 result
381 }
382 fn read(&self, off: u64, bytes: &mut [u8]) {
383 let mut f = self.file.take().unwrap();
384 f.read(off, bytes);
385 self.file.set(Some(f));
386 }
387
388 fn write(&mut self, off: u64, bytes: &[u8]) {
389 let mut f = self.file.take().unwrap();
390 f.write(off, bytes);
391 self.file.set(Some(f));
392 }
393
394 fn commit(&mut self, size: u64) {
395 let mut f = self.file.take().unwrap();
396 f.commit(size);
397 self.file.set(Some(f));
398 }
399}
400
401pub struct SimpleFileStorage {
403 file: Arc<Mutex<FileInner>>,
404}
405
406impl SimpleFileStorage {
407 pub fn new(filename: &str) -> Box<Self> {
409 Box::new(Self {
410 file: Arc::new(Mutex::new(FileInner::new(filename))),
411 })
412 }
413}
414
415impl Storage for SimpleFileStorage {
416 fn clone(&self) -> Box<dyn Storage> {
417 Box::new(Self {
418 file: self.file.clone(),
419 })
420 }
421}
422
423impl BasicStorage for SimpleFileStorage {
424 fn size(&self) -> u64 {
425 self.file.lock().unwrap().size()
426 }
427
428 fn read(&self, off: u64, bytes: &mut [u8]) {
429 self.file.lock().unwrap().read(off, bytes);
430 }
431
432 fn write(&mut self, off: u64, bytes: &[u8]) {
433 self.file.lock().unwrap().write(off, bytes);
434 }
435
436 fn commit(&mut self, size: u64) {
437 self.file.lock().unwrap().commit(size);
438 }
439}
440
441pub struct AnyFileStorage {
443 filename: String,
444 files: Arc<Mutex<Vec<FileInner>>>,
445}
446
447impl AnyFileStorage {
448 pub fn new(filename: &str) -> Box<Self> {
450 Box::new(Self {
451 filename: filename.to_owned(),
452 files: Arc::new(Mutex::new(Vec::new())),
453 })
454 }
455
456 fn get_file(&self) -> FileInner {
457 match self.files.lock().unwrap().pop() {
458 Some(f) => f,
459 _ => FileInner::new(&self.filename),
460 }
461 }
462
463 fn put_file(&self, f: FileInner) {
464 self.files.lock().unwrap().push(f);
465 }
466}
467
468impl Storage for AnyFileStorage {
469 fn clone(&self) -> Box<dyn Storage> {
470 Box::new(Self {
471 filename: self.filename.clone(),
472 files: self.files.clone(),
473 })
474 }
475}
476
477impl BasicStorage for AnyFileStorage {
478 fn size(&self) -> u64 {
479 let mut f = self.get_file();
480 let result = f.size();
481 self.put_file(f);
482 result
483 }
484
485 fn read(&self, off: u64, bytes: &mut [u8]) {
486 let mut f = self.get_file();
487 f.read(off, bytes);
488 self.put_file(f);
489 }
490
491 fn write(&mut self, off: u64, bytes: &[u8]) {
492 let mut f = self.get_file();
493 f.write(off, bytes);
494 self.put_file(f);
495 }
496
497 fn commit(&mut self, size: u64) {
498 let mut f = self.get_file();
499 f.commit(size);
500 self.put_file(f);
501 }
502}
503
504pub struct DummyFile {}
506impl DummyFile {
507 pub fn new() -> Box<Self> {
509 Box::new(Self {})
510 }
511}
512
513impl Storage for DummyFile {
514 fn clone(&self) -> Box<dyn Storage> {
515 Self::new()
516 }
517}
518
519impl BasicStorage for DummyFile {
520 fn size(&self) -> u64 {
521 0
522 }
523
524 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
525
526 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
527
528 fn commit(&mut self, _size: u64) {}
529}
530
531#[non_exhaustive]
533pub struct Limits {
534 pub map_lim: usize,
536 pub rbuf_mem: usize,
538 pub swbuf: usize,
540 pub uwbuf: usize,
542}
543
544impl Default for Limits {
545 fn default() -> Self {
546 Self {
547 map_lim: 5000,
548 rbuf_mem: 0x200000,
549 swbuf: 0x100000,
550 uwbuf: 0x100000,
551 }
552 }
553}
554
555struct WriteBuffer {
557 ix: usize,
559 pos: u64,
561 pub stg: Box<dyn BasicStorage>,
563 buf: Vec<u8>,
565}
566
567impl WriteBuffer {
568 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
570 Self {
571 ix: 0,
572 pos: u64::MAX,
573 stg,
574 buf: vec![0; buf_size],
575 }
576 }
577
578 pub fn write(&mut self, off: u64, data: &[u8]) {
580 if self.pos + self.ix as u64 != off {
581 self.flush(off);
582 }
583 let mut done: usize = 0;
584 let mut todo: usize = data.len();
585 while todo > 0 {
586 let mut n: usize = self.buf.len() - self.ix;
587 if n == 0 {
588 self.flush(off + done as u64);
589 n = self.buf.len();
590 }
591 if n > todo {
592 n = todo;
593 }
594 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
595 todo -= n;
596 done += n;
597 self.ix += n;
598 }
599 }
600
601 fn flush(&mut self, new_pos: u64) {
602 if self.ix > 0 {
603 self.stg.write(self.pos, &self.buf[0..self.ix]);
604 }
605 self.ix = 0;
606 self.pos = new_pos;
607 }
608
609 pub fn commit(&mut self, size: u64) {
611 self.flush(u64::MAX);
612 self.stg.commit(size);
613 }
614
615 pub fn write_u64(&mut self, start: u64, value: u64) {
617 self.write(start, &value.to_le_bytes());
618 }
619}
620
621struct ReadBufStg<const N: usize> {
627 stg: Box<dyn Storage>,
629 buf: Mutex<ReadBuffer<N>>,
631 limit: usize,
633}
634
635impl<const N: usize> Drop for ReadBufStg<N> {
636 fn drop(&mut self) {
637 self.reset();
638 }
639}
640
641impl<const N: usize> ReadBufStg<N> {
642 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
644 Self {
645 stg,
646 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
647 limit,
648 }
649 }
650
651 fn reset(&mut self) {
653 self.buf.lock().unwrap().reset();
654 }
655}
656
657impl<const N: usize> BasicStorage for ReadBufStg<N> {
658 fn read(&self, start: u64, data: &mut [u8]) {
660 if data.len() <= self.limit {
661 self.buf.lock().unwrap().read(&*self.stg, start, data);
662 } else {
663 self.stg.read(start, data);
664 }
665 }
666
667 fn size(&self) -> u64 {
669 panic!()
670 }
671
672 fn write(&mut self, _start: u64, _data: &[u8]) {
674 panic!();
675 }
676
677 fn commit(&mut self, _size: u64) {
679 panic!();
680 }
681}
682
683struct ReadBuffer<const N: usize> {
684 map: HashMap<u64, Box<[u8; N]>>,
686 max_buf: usize,
688}
689
690impl<const N: usize> ReadBuffer<N> {
691 fn new(max_buf: usize) -> Self {
692 Self {
693 map: HashMap::default(),
694 max_buf,
695 }
696 }
697
698 fn reset(&mut self) {
699 self.map.clear();
700 }
701
702 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
703 let mut done = 0;
704 while done < data.len() {
705 let off = off + done as u64;
706 let sector = off / N as u64;
707 let disp = (off % N as u64) as usize;
708 let amount = min(data.len() - done, N - disp);
709
710 let p = self.map.entry(sector).or_insert_with(|| {
711 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
712 stg.read(sector * N as u64, &mut *p);
713 p
714 });
715 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
716 done += amount;
717 }
718 if self.map.len() >= self.max_buf {
719 self.reset();
720 }
721 }
722}
723
724#[derive(Default)]
725struct DataSlice {
727 pub data: Data,
729 pub off: usize,
731 pub len: usize,
733}
734
735impl DataSlice {
736 pub fn all(&self) -> &[u8] {
738 &self.data[self.off..self.off + self.len]
739 }
740 pub fn part(&self, off: usize, len: usize) -> &[u8] {
742 &self.data[self.off + off..self.off + off + len]
743 }
744 pub fn trim(&mut self, trim: usize) {
746 self.off += trim;
747 self.len -= trim;
748 }
749 #[allow(dead_code)]
751 pub fn take(&mut self) -> Data {
752 std::mem::take(&mut self.data)
753 }
754}
755
756#[derive(Default)]
757struct WMap {
759 map: BTreeMap<u64, DataSlice>,
761}
762
763impl WMap {
764 pub fn is_empty(&self) -> bool {
766 self.map.is_empty()
767 }
768
769 pub fn len(&self) -> usize {
771 self.map.len()
772 }
773
774 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
776 let map = std::mem::take(&mut self.map);
777 let mut result = GVec::with_capacity(map.len());
778 for (end, v) in map {
779 let start = end - v.len as u64;
780 result.push((start, v));
781 }
782 result
783 }
784
785 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
787 for (end, v) in self.map.iter() {
788 let start = end - v.len as u64;
789 stg.write_data(start, v.data.clone(), v.off, v.len);
790 }
791 }
792
793 #[cfg(not(feature = "pstd"))]
794 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
796 if len != 0 {
797 let (mut insert, mut remove) = (Vec::new(), Vec::new());
798 let end = start + len as u64;
799 for (ee, v) in self.map.range_mut(start + 1..) {
800 let ee = *ee;
801 let es = ee - v.len as u64; if es >= end {
803 break;
805 } else if start <= es {
806 if end < ee {
807 v.trim((end - es) as usize);
809 break;
810 }
811 remove.push(ee);
813 } else if end < ee {
814 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
817 v.trim((end - es) as usize);
818 break;
819 } else {
820 insert.push((es, v.take(), v.off, (start - es) as usize));
823 remove.push(ee);
824 }
825 }
826 for end in remove {
827 self.map.remove(&end);
828 }
829 for (start, data, off, len) in insert {
830 self.map
831 .insert(start + len as u64, DataSlice { data, off, len });
832 }
833 self.map
834 .insert(start + len as u64, DataSlice { data, off, len });
835 }
836 }
837
838 #[cfg(feature = "pstd")]
839 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
841 if len != 0 {
842 let end = start + len as u64;
843 let mut c = self
844 .map
845 .lower_bound_mut(std::ops::Bound::Excluded(&start))
846 .with_mutable_key();
847 while let Some((eend, v)) = c.next() {
848 let ee = *eend;
849 let es = ee - v.len as u64; if es >= end {
851 c.prev();
853 break;
854 } else if start <= es {
855 if end < ee {
856 v.trim((end - es) as usize);
858 c.prev();
859 break;
860 }
861 c.remove_prev();
863 } else if end < ee {
864 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
867 v.trim((end - es) as usize);
868 c.prev();
869 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
870 break;
871 } else {
872 v.len = (start - es) as usize;
875 *eend = es + v.len as u64;
876 }
877 }
878 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
880 }
881 }
882
883 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
885 let len = data.len();
886 if len != 0 {
887 let mut done = 0;
888 for (&end, v) in self.map.range(start + 1..) {
889 let es = end - v.len as u64; let doff = start + done as u64;
891 if es > doff {
892 let a = min(len - done, (es - doff) as usize);
894 u.read(doff, &mut data[done..done + a]);
895 done += a;
896 if done == len {
897 return;
898 }
899 }
900 let skip = (start + done as u64 - es) as usize;
902 let a = min(len - done, v.len - skip);
903 data[done..done + a].copy_from_slice(v.part(skip, a));
904 done += a;
905 if done == len {
906 return;
907 }
908 }
909 u.read(start + done as u64, &mut data[done..]);
910 }
911 }
912}
913
914pub struct BasicAtomicFile {
916 stg: WriteBuffer,
918 upd: WriteBuffer,
920 map: WMap,
922 list: GVec<(u64, DataSlice)>,
924 size: u64,
925}
926
927impl BasicAtomicFile {
928 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
930 let size = stg.size();
931 let mut result = Box::new(Self {
932 stg: WriteBuffer::new(stg, lim.swbuf),
933 upd: WriteBuffer::new(upd, lim.uwbuf),
934 map: WMap::default(),
935 list: GVec::new(),
936 size,
937 });
938 result.init();
939 result
940 }
941
942 fn init(&mut self) {
944 let end = self.upd.stg.read_u64(0);
945 let size = self.upd.stg.read_u64(8);
946 if end == 0 {
947 return;
948 }
949 assert!(end == self.upd.stg.size());
950 let mut pos = 16;
951 while pos < end {
952 let start = self.upd.stg.read_u64(pos);
953 pos += 8;
954 let len = self.upd.stg.read_u64(pos);
955 pos += 8;
956 let mut buf = vec![0; len as usize];
957 self.upd.stg.read(pos, &mut buf);
958 pos += len;
959 self.stg.write(start, &buf);
960 }
961 self.stg.commit(size);
962 self.upd.commit(0);
963 }
964
965 pub fn commit_phase(&mut self, size: u64, phase: u8) {
967 if self.map.is_empty() && self.list.is_empty() {
968 return;
969 }
970 if phase == 1 {
971 self.list = self.map.convert_to_vec();
972
973 self.upd.write_u64(0, 0);
976 self.upd.write_u64(8, size);
977 self.upd.commit(16); let mut stg_written = false;
981 let mut pos: u64 = 16;
982 for (start, v) in self.list.iter() {
983 let (start, len, data) = (*start, v.len as u64, v.all());
984 if start >= self.size {
985 stg_written = true;
987 self.stg.write(start, data);
988 } else {
989 self.upd.write_u64(pos, start);
990 pos += 8;
991 self.upd.write_u64(pos, len);
992 pos += 8;
993 self.upd.write(pos, data);
994 pos += len;
995 }
996 }
997 if stg_written {
998 self.stg.commit(size);
999 }
1000 self.upd.commit(pos); self.upd.write_u64(0, pos);
1004 self.upd.write_u64(8, size);
1005 self.upd.commit(pos);
1006 } else {
1007 for (start, v) in self.list.iter() {
1008 if *start < self.size {
1009 self.stg.write(*start, v.all());
1011 }
1012 }
1013 self.list = GVec::new();
1014 self.stg.commit(size);
1015 self.upd.commit(0);
1016 }
1017 }
1018}
1019
1020impl BasicStorage for BasicAtomicFile {
1021 fn commit(&mut self, size: u64) {
1022 self.commit_phase(size, 1);
1023 self.commit_phase(size, 2);
1024 self.size = size;
1025 }
1026
1027 fn size(&self) -> u64 {
1028 self.size
1029 }
1030
1031 fn read(&self, start: u64, data: &mut [u8]) {
1032 self.map.read(start, data, &*self.stg.stg);
1033 }
1034
1035 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1036 self.map.write(start, data, off, len);
1037 }
1038
1039 fn write(&mut self, start: u64, data: &[u8]) {
1040 let len = data.len();
1041 let d = Arc::new(data.to_vec());
1042 self.write_data(start, d, 0, len);
1043 }
1044}
1045
1046#[cfg(target_family = "unix")]
1048pub struct UnixFileStorage {
1049 size: Arc<Mutex<u64>>,
1050 f: fs::File,
1051}
1052#[cfg(target_family = "unix")]
1053impl UnixFileStorage {
1054 pub fn new(filename: &str) -> Box<Self> {
1056 let mut f = OpenOptions::new()
1057 .read(true)
1058 .write(true)
1059 .create(true)
1060 .truncate(false)
1061 .open(filename)
1062 .unwrap();
1063 let size = f.seek(SeekFrom::End(0)).unwrap();
1064 let size = Arc::new(Mutex::new(size));
1065 Box::new(Self { size, f })
1066 }
1067}
1068
1069#[cfg(target_family = "unix")]
1070impl Storage for UnixFileStorage {
1071 fn clone(&self) -> Box<dyn Storage> {
1072 Box::new(Self {
1073 size: self.size.clone(),
1074 f: self.f.try_clone().unwrap(),
1075 })
1076 }
1077}
1078
1079#[cfg(target_family = "unix")]
1080use std::os::unix::fs::FileExt;
1081
1082#[cfg(target_family = "unix")]
1083impl BasicStorage for UnixFileStorage {
1084 fn read(&self, start: u64, data: &mut [u8]) {
1085 let _ = self.f.read_at(data, start);
1086 }
1087
1088 fn write(&mut self, start: u64, data: &[u8]) {
1089 let _ = self.f.write_at(data, start);
1090 }
1091
1092 fn size(&self) -> u64 {
1093 *self.size.lock().unwrap()
1094 }
1095
1096 fn commit(&mut self, size: u64) {
1097 *self.size.lock().unwrap() = size;
1098 self.f.set_len(size).unwrap();
1099 self.f.sync_all().unwrap();
1100 }
1101}
1102
1103#[cfg(target_family = "windows")]
1105pub struct WindowsFileStorage {
1106 size: Arc<Mutex<u64>>,
1107 f: fs::File,
1108}
1109#[cfg(target_family = "windows")]
1110impl WindowsFileStorage {
1111 pub fn new(filename: &str) -> Box<Self> {
1113 let mut f = OpenOptions::new()
1114 .read(true)
1115 .write(true)
1116 .create(true)
1117 .truncate(false)
1118 .open(filename)
1119 .unwrap();
1120 let size = f.seek(SeekFrom::End(0)).unwrap();
1121 let size = Arc::new(Mutex::new(size));
1122 Box::new(Self { size, f })
1123 }
1124}
1125
1126#[cfg(target_family = "windows")]
1127impl Storage for WindowsFileStorage {
1128 fn clone(&self) -> Box<dyn Storage> {
1129 Box::new(Self {
1130 size: self.size.clone(),
1131 f: self.f.try_clone().unwrap(),
1132 })
1133 }
1134}
1135
1136#[cfg(target_family = "windows")]
1137use std::os::windows::fs::FileExt;
1138
1139#[cfg(target_family = "windows")]
1140impl BasicStorage for WindowsFileStorage {
1141 fn read(&self, start: u64, data: &mut [u8]) {
1142 let _ = self.f.seek_read(data, start);
1143 }
1144
1145 fn write(&mut self, start: u64, data: &[u8]) {
1146 let _ = self.f.seek_write(data, start);
1147 }
1148
1149 fn size(&self) -> u64 {
1150 *self.size.lock().unwrap()
1151 }
1152
1153 fn commit(&mut self, size: u64) {
1154 *self.size.lock().unwrap() = size;
1155 self.f.set_len(size).unwrap();
1156 self.f.sync_all().unwrap();
1157 }
1158}
1159
1160#[cfg(target_family = "windows")]
1162pub type MultiFileStorage = WindowsFileStorage;
1163
1164#[cfg(target_family = "unix")]
1166pub type MultiFileStorage = UnixFileStorage;
1167
1168#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1170pub type MultiFileStorage = AnyFileStorage;
1171
1172#[cfg(any(target_family = "windows", target_family = "unix"))]
1174pub type FastFileStorage = MultiFileStorage;
1175
1176#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1178pub type FastFileStorage = UpdFileStorage;
1179
1180#[cfg(test)]
1181fn test_amount() -> usize {
1183 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1184}
1185
1186#[test]
1187fn test_atomic_file() {
1188 use rand::Rng;
1189 let ta = test_amount();
1192 println!(" Test amount={}", ta);
1193
1194 let mut rng = rand::thread_rng();
1195
1196 for _ in 0..100 {
1197 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1198 let mut s2 = MemFile::new();
1200
1201 for _ in 0..1000 * ta {
1202 let off: usize = rng.r#gen::<usize>() % 100;
1203 let mut len = 1 + rng.r#gen::<usize>() % 20;
1204 let w: bool = rng.r#gen();
1205 if w {
1206 let mut bytes = Vec::new();
1207 while len > 0 {
1208 len -= 1;
1209 let b: u8 = rng.r#gen::<u8>();
1210 bytes.push(b);
1211 }
1212 s1.write(off as u64, &bytes);
1213 s2.write(off as u64, &bytes);
1214 } else {
1215 let mut b2 = vec![0; len];
1216 let mut b3 = vec![0; len];
1217 s1.read(off as u64, &mut b2);
1218 s2.read(off as u64, &mut b3);
1219 assert!(b2 == b3);
1220 }
1221 if rng.r#gen::<usize>() % 50 == 0 {
1222 s1.commit(200);
1223 s2.commit(200);
1224 }
1225 }
1226 }
1227}