1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{collections::{BTreeMapA,btree_map::CustomTuning}, localalloc::GTemp};
24
25#[cfg(feature = "pstd")]
26type BTreeMap<K,V> = BTreeMapA<K,V,CustomTuning<GTemp>>;
27
28#[cfg(not(feature = "pstd"))]
29use std::collections::BTreeMap;
30
31pub type Data = Arc<Vec<u8>>;
33
34pub struct AtomicFile {
51 map: WMap,
53 cf: Arc<RwLock<CommitFile>>,
55 size: u64,
57 tx: std::sync::mpsc::Sender<(u64, WMap)>,
59 busy: Arc<Mutex<()>>,
61 map_lim: usize,
63}
64
65impl AtomicFile {
66 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
68 Self::new_with_limits(stg, upd, &Limits::default())
69 }
70
71 pub fn new_with_limits(
73 stg: Box<dyn Storage>,
74 upd: Box<dyn BasicStorage>,
75 lim: &Limits,
76 ) -> Box<Self> {
77 let size = stg.size();
78 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
79
80 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
81 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
82 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
86
87 std::thread::spawn(move || {
88 while let Ok((size, map)) = rx.recv() {
90 let _lock = busy1.lock();
91 baf.map = map;
92 baf.commit(size);
93 cf1.write().unwrap().done_one();
94 }
95 });
96 Box::new(Self {
97 map: WMap::default(),
98 cf,
99 size,
100 tx,
101 busy,
102 map_lim: lim.map_lim,
103 })
104 }
105}
106
107impl Storage for AtomicFile {
108 fn clone(&self) -> Box<dyn Storage> {
109 panic!()
110 }
111}
112
113impl BasicStorage for AtomicFile {
114 fn commit(&mut self, size: u64) {
115 self.size = size;
116 if self.map.is_empty() {
117 return;
118 }
119 if self.cf.read().unwrap().map.len() > self.map_lim {
120 self.wait_complete();
121 }
122 let map = std::mem::take(&mut self.map);
123 let cf = &mut *self.cf.write().unwrap();
124 cf.todo += 1;
125 map.to_storage(cf);
127 self.tx.send((size, map)).unwrap();
129 }
130
131 fn size(&self) -> u64 {
132 self.size
133 }
134
135 fn read(&self, start: u64, data: &mut [u8]) {
136 self.map.read(start, data, &*self.cf.read().unwrap());
137 }
138
139 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
140 self.map.write(start, data, off, len);
141 }
142
143 fn write(&mut self, start: u64, data: &[u8]) {
144 let len = data.len();
145 let d = Arc::new(data.to_vec());
146 self.write_data(start, d, 0, len);
147 }
148
149 fn wait_complete(&self) {
150 while self.cf.read().unwrap().todo != 0 {
151 let _x = self.busy.lock();
152 }
153 }
154}
155
156struct CommitFile {
157 stg: ReadBufStg<256>,
159 map: WMap,
161 todo: usize,
163}
164
165impl CommitFile {
166 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
167 Self {
168 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
169 map: WMap::default(),
170 todo: 0,
171 }
172 }
173
174 fn done_one(&mut self) {
175 self.todo -= 1;
176 if self.todo == 0 {
177 self.map = WMap::default();
178 self.stg.reset();
179 }
180 }
181}
182
183impl BasicStorage for CommitFile {
184 fn commit(&mut self, _size: u64) {
185 panic!()
186 }
187
188 fn size(&self) -> u64 {
189 panic!()
190 }
191
192 fn read(&self, start: u64, data: &mut [u8]) {
193 self.map.read(start, data, &self.stg);
194 }
195
196 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
197 self.map.write(start, data, off, len);
198 }
199
200 fn write(&mut self, _start: u64, _data: &[u8]) {
201 panic!()
202 }
203}
204
205pub trait BasicStorage: Send {
209 fn size(&self) -> u64;
212
213 fn read(&self, start: u64, data: &mut [u8]);
215
216 fn write(&mut self, start: u64, data: &[u8]);
218
219 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
221 let len = data.len();
222 let d = Arc::new(data);
223 self.write_data(start, d, 0, len);
224 }
225
226 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
228 self.write(start, &data[off..off + len]);
229 }
230
231 fn commit(&mut self, size: u64);
233
234 fn write_u64(&mut self, start: u64, value: u64) {
236 self.write(start, &value.to_le_bytes());
237 }
238
239 fn read_u64(&self, start: u64) -> u64 {
241 let mut bytes = [0; 8];
242 self.read(start, &mut bytes);
243 u64::from_le_bytes(bytes)
244 }
245
246 fn wait_complete(&self) {}
248}
249
250pub trait Storage: BasicStorage + Sync {
252 fn clone(&self) -> Box<dyn Storage>;
254}
255
256#[derive(Default)]
258pub struct MemFile {
259 v: Arc<Mutex<Vec<u8>>>,
260}
261
262impl MemFile {
263 pub fn new() -> Box<Self> {
265 Box::default()
266 }
267}
268
269impl Storage for MemFile {
270 fn clone(&self) -> Box<dyn Storage> {
271 Box::new(Self { v: self.v.clone() })
272 }
273}
274
275impl BasicStorage for MemFile {
276 fn size(&self) -> u64 {
277 let v = self.v.lock().unwrap();
278 v.len() as u64
279 }
280
281 fn read(&self, off: u64, bytes: &mut [u8]) {
282 let off = off as usize;
283 let len = bytes.len();
284 let mut v = self.v.lock().unwrap();
285 if off + len > v.len() {
286 v.resize(off + len, 0);
287 }
288 bytes.copy_from_slice(&v[off..off + len]);
289 }
290
291 fn write(&mut self, off: u64, bytes: &[u8]) {
292 let off = off as usize;
293 let len = bytes.len();
294 let mut v = self.v.lock().unwrap();
295 if off + len > v.len() {
296 v.resize(off + len, 0);
297 }
298 v[off..off + len].copy_from_slice(bytes);
299 }
300
301 fn commit(&mut self, size: u64) {
302 let mut v = self.v.lock().unwrap();
303 v.resize(size as usize, 0);
304 }
305}
306
307use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
308
309struct FileInner {
310 f: fs::File,
311}
312
313impl FileInner {
314 pub fn new(filename: &str) -> Self {
316 Self {
317 f: OpenOptions::new()
318 .read(true)
319 .write(true)
320 .create(true)
321 .truncate(false)
322 .open(filename)
323 .unwrap(),
324 }
325 }
326
327 fn size(&mut self) -> u64 {
328 self.f.seek(SeekFrom::End(0)).unwrap()
329 }
330
331 fn read(&mut self, off: u64, bytes: &mut [u8]) {
332 self.f.seek(SeekFrom::Start(off)).unwrap();
333 let _ = self.f.read(bytes).unwrap();
334 }
335
336 fn write(&mut self, off: u64, bytes: &[u8]) {
337 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
339 {
340 let size = self.f.seek(SeekFrom::End(0)).unwrap();
341 if off > size {
342 self.f.set_len(off).unwrap();
343 }
344 }
345 self.f.seek(SeekFrom::Start(off)).unwrap();
346 let _ = self.f.write(bytes).unwrap();
347 }
348
349 fn commit(&mut self, size: u64) {
350 self.f.set_len(size).unwrap();
351 self.f.sync_all().unwrap();
352 }
353}
354
355pub struct UpdFileStorage {
357 file: Cell<Option<FileInner>>,
358}
359
360impl UpdFileStorage {
361 pub fn new(filename: &str) -> Box<Self> {
363 Box::new(Self {
364 file: Cell::new(Some(FileInner::new(filename))),
365 })
366 }
367}
368
369impl BasicStorage for UpdFileStorage {
370 fn size(&self) -> u64 {
371 let mut f = self.file.take().unwrap();
372 let result = f.size();
373 self.file.set(Some(f));
374 result
375 }
376 fn read(&self, off: u64, bytes: &mut [u8]) {
377 let mut f = self.file.take().unwrap();
378 f.read(off, bytes);
379 self.file.set(Some(f));
380 }
381
382 fn write(&mut self, off: u64, bytes: &[u8]) {
383 let mut f = self.file.take().unwrap();
384 f.write(off, bytes);
385 self.file.set(Some(f));
386 }
387
388 fn commit(&mut self, size: u64) {
389 let mut f = self.file.take().unwrap();
390 f.commit(size);
391 self.file.set(Some(f));
392 }
393}
394
395pub struct SimpleFileStorage {
397 file: Arc<Mutex<FileInner>>,
398}
399
400impl SimpleFileStorage {
401 pub fn new(filename: &str) -> Box<Self> {
403 Box::new(Self {
404 file: Arc::new(Mutex::new(FileInner::new(filename))),
405 })
406 }
407}
408
409impl Storage for SimpleFileStorage {
410 fn clone(&self) -> Box<dyn Storage> {
411 Box::new(Self {
412 file: self.file.clone(),
413 })
414 }
415}
416
417impl BasicStorage for SimpleFileStorage {
418 fn size(&self) -> u64 {
419 self.file.lock().unwrap().size()
420 }
421
422 fn read(&self, off: u64, bytes: &mut [u8]) {
423 self.file.lock().unwrap().read(off, bytes);
424 }
425
426 fn write(&mut self, off: u64, bytes: &[u8]) {
427 self.file.lock().unwrap().write(off, bytes);
428 }
429
430 fn commit(&mut self, size: u64) {
431 self.file.lock().unwrap().commit(size);
432 }
433}
434
435pub struct AnyFileStorage {
437 filename: String,
438 files: Arc<Mutex<Vec<FileInner>>>,
439}
440
441impl AnyFileStorage {
442 pub fn new(filename: &str) -> Box<Self> {
444 Box::new(Self {
445 filename: filename.to_owned(),
446 files: Arc::new(Mutex::new(Vec::new())),
447 })
448 }
449
450 fn get_file(&self) -> FileInner {
451 match self.files.lock().unwrap().pop() {
452 Some(f) => f,
453 _ => FileInner::new(&self.filename),
454 }
455 }
456
457 fn put_file(&self, f: FileInner) {
458 self.files.lock().unwrap().push(f);
459 }
460}
461
462impl Storage for AnyFileStorage {
463 fn clone(&self) -> Box<dyn Storage> {
464 Box::new(Self {
465 filename: self.filename.clone(),
466 files: self.files.clone(),
467 })
468 }
469}
470
471impl BasicStorage for AnyFileStorage {
472 fn size(&self) -> u64 {
473 let mut f = self.get_file();
474 let result = f.size();
475 self.put_file(f);
476 result
477 }
478
479 fn read(&self, off: u64, bytes: &mut [u8]) {
480 let mut f = self.get_file();
481 f.read(off, bytes);
482 self.put_file(f);
483 }
484
485 fn write(&mut self, off: u64, bytes: &[u8]) {
486 let mut f = self.get_file();
487 f.write(off, bytes);
488 self.put_file(f);
489 }
490
491 fn commit(&mut self, size: u64) {
492 let mut f = self.get_file();
493 f.commit(size);
494 self.put_file(f);
495 }
496}
497
498pub struct DummyFile {}
500impl DummyFile {
501 pub fn new() -> Box<Self> {
503 Box::new(Self {})
504 }
505}
506
507impl Storage for DummyFile {
508 fn clone(&self) -> Box<dyn Storage> {
509 Self::new()
510 }
511}
512
513impl BasicStorage for DummyFile {
514 fn size(&self) -> u64 {
515 0
516 }
517
518 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
519
520 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
521
522 fn commit(&mut self, _size: u64) {}
523}
524
525#[non_exhaustive]
527pub struct Limits {
528 pub map_lim: usize,
530 pub rbuf_mem: usize,
532 pub swbuf: usize,
534 pub uwbuf: usize,
536}
537
538impl Default for Limits {
539 fn default() -> Self {
540 Self {
541 map_lim: 5000,
542 rbuf_mem: 0x200000,
543 swbuf: 0x100000,
544 uwbuf: 0x100000,
545 }
546 }
547}
548
549struct WriteBuffer {
551 ix: usize,
553 pos: u64,
555 pub stg: Box<dyn BasicStorage>,
557 buf: Vec<u8>,
559}
560
561impl WriteBuffer {
562 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
564 Self {
565 ix: 0,
566 pos: u64::MAX,
567 stg,
568 buf: vec![0; buf_size],
569 }
570 }
571
572 pub fn write(&mut self, off: u64, data: &[u8]) {
574 if self.pos + self.ix as u64 != off {
575 self.flush(off);
576 }
577 let mut done: usize = 0;
578 let mut todo: usize = data.len();
579 while todo > 0 {
580 let mut n: usize = self.buf.len() - self.ix;
581 if n == 0 {
582 self.flush(off + done as u64);
583 n = self.buf.len();
584 }
585 if n > todo {
586 n = todo;
587 }
588 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
589 todo -= n;
590 done += n;
591 self.ix += n;
592 }
593 }
594
595 fn flush(&mut self, new_pos: u64) {
596 if self.ix > 0 {
597 self.stg.write(self.pos, &self.buf[0..self.ix]);
598 }
599 self.ix = 0;
600 self.pos = new_pos;
601 }
602
603 pub fn commit(&mut self, size: u64) {
605 self.flush(u64::MAX);
606 self.stg.commit(size);
607 }
608
609 pub fn write_u64(&mut self, start: u64, value: u64) {
611 self.write(start, &value.to_le_bytes());
612 }
613}
614
615struct ReadBufStg<const N: usize> {
621 stg: Box<dyn Storage>,
623 buf: Mutex<ReadBuffer<N>>,
625 limit: usize,
627}
628
629impl<const N: usize> Drop for ReadBufStg<N> {
630 fn drop(&mut self) {
631 self.reset();
632 }
633}
634
635impl<const N: usize> ReadBufStg<N> {
636 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
638 Self {
639 stg,
640 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
641 limit,
642 }
643 }
644
645 fn reset(&mut self) {
647 self.buf.lock().unwrap().reset();
648 }
649}
650
651impl<const N: usize> BasicStorage for ReadBufStg<N> {
652 fn read(&self, start: u64, data: &mut [u8]) {
654 if data.len() <= self.limit {
655 self.buf.lock().unwrap().read(&*self.stg, start, data);
656 } else {
657 self.stg.read(start, data);
658 }
659 }
660
661 fn size(&self) -> u64 {
663 panic!()
664 }
665
666 fn write(&mut self, _start: u64, _data: &[u8]) {
668 panic!();
669 }
670
671 fn commit(&mut self, _size: u64) {
673 panic!();
674 }
675}
676
677struct ReadBuffer<const N: usize> {
678 map: HashMap<u64, Box<[u8; N]>>,
680 max_buf: usize,
682}
683
684impl<const N: usize> ReadBuffer<N> {
685 fn new(max_buf: usize) -> Self {
686 Self {
687 map: HashMap::default(),
688 max_buf,
689 }
690 }
691
692 fn reset(&mut self) {
693 self.map.clear();
694 }
695
696 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
697 let mut done = 0;
698 while done < data.len() {
699 let off = off + done as u64;
700 let sector = off / N as u64;
701 let disp = (off % N as u64) as usize;
702 let amount = min(data.len() - done, N - disp);
703
704 let p = self.map.entry(sector).or_insert_with(|| {
705 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
706 stg.read(sector * N as u64, &mut *p);
707 p
708 });
709 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
710 done += amount;
711 }
712 if self.map.len() >= self.max_buf {
713 self.reset();
714 }
715 }
716}
717
718#[derive(Default)]
719struct DataSlice {
721 pub data: Data,
723 pub off: usize,
725 pub len: usize,
727}
728
729impl DataSlice {
730 pub fn all(&self) -> &[u8] {
732 &self.data[self.off..self.off + self.len]
733 }
734 pub fn part(&self, off: usize, len: usize) -> &[u8] {
736 &self.data[self.off + off..self.off + off + len]
737 }
738 pub fn trim(&mut self, trim: usize) {
740 self.off += trim;
741 self.len -= trim;
742 }
743 #[allow(dead_code)]
745 pub fn take(&mut self) -> Data {
746 std::mem::take(&mut self.data)
747 }
748}
749
750#[derive(Default)]
751struct WMap {
753 map: BTreeMap<u64, DataSlice>,
755}
756
757impl WMap {
758 pub fn is_empty(&self) -> bool {
760 self.map.is_empty()
761 }
762
763 pub fn len(&self) -> usize {
765 self.map.len()
766 }
767
768 pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
770 let map = std::mem::take(&mut self.map);
771 let mut result = Vec::with_capacity(map.len());
772 for (end, v) in map {
773 let start = end - v.len as u64;
774 result.push((start, v));
775 }
776 result
777 }
778
779 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
781 for (end, v) in self.map.iter() {
782 let start = end - v.len as u64;
783 stg.write_data(start, v.data.clone(), v.off, v.len);
784 }
785 }
786
787 #[cfg(not(feature = "pstd"))]
788 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
790 if len != 0 {
791 let (mut insert, mut remove) = (Vec::new(), Vec::new());
792 let end = start + len as u64;
793 for (ee, v) in self.map.range_mut(start + 1..) {
794 let ee = *ee;
795 let es = ee - v.len as u64; if es >= end {
797 break;
799 } else if start <= es {
800 if end < ee {
801 v.trim((end - es) as usize);
803 break;
804 }
805 remove.push(ee);
807 } else if end < ee {
808 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
811 v.trim((end - es) as usize);
812 break;
813 } else {
814 insert.push((es, v.take(), v.off, (start - es) as usize));
817 remove.push(ee);
818 }
819 }
820 for end in remove {
821 self.map.remove(&end);
822 }
823 for (start, data, off, len) in insert {
824 self.map
825 .insert(start + len as u64, DataSlice { data, off, len });
826 }
827 self.map
828 .insert(start + len as u64, DataSlice { data, off, len });
829 }
830 }
831
832 #[cfg(feature = "pstd")]
833 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
835 if len != 0 {
836 let end = start + len as u64;
837 let mut c = self
838 .map
839 .lower_bound_mut(std::ops::Bound::Excluded(&start))
840 .with_mutable_key();
841 while let Some((eend, v)) = c.next() {
842 let ee = *eend;
843 let es = ee - v.len as u64; if es >= end {
845 c.prev();
847 break;
848 } else if start <= es {
849 if end < ee {
850 v.trim((end - es) as usize);
852 c.prev();
853 break;
854 }
855 c.remove_prev();
857 } else if end < ee {
858 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
861 v.trim((end - es) as usize);
862 c.prev();
863 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
864 break;
865 } else {
866 v.len = (start - es) as usize;
869 *eend = es + v.len as u64;
870 }
871 }
872 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
874 }
875 }
876
877 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
879 let len = data.len();
880 if len != 0 {
881 let mut done = 0;
882 for (&end, v) in self.map.range(start + 1..) {
883 let es = end - v.len as u64; let doff = start + done as u64;
885 if es > doff {
886 let a = min(len - done, (es - doff) as usize);
888 u.read(doff, &mut data[done..done + a]);
889 done += a;
890 if done == len {
891 return;
892 }
893 }
894 let skip = (start + done as u64 - es) as usize;
896 let a = min(len - done, v.len - skip);
897 data[done..done + a].copy_from_slice(v.part(skip, a));
898 done += a;
899 if done == len {
900 return;
901 }
902 }
903 u.read(start + done as u64, &mut data[done..]);
904 }
905 }
906}
907
908pub struct BasicAtomicFile {
910 stg: WriteBuffer,
912 upd: WriteBuffer,
914 map: WMap,
916 list: Vec<(u64, DataSlice)>,
918 size: u64,
919}
920
921impl BasicAtomicFile {
922 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
924 let size = stg.size();
925 let mut result = Box::new(Self {
926 stg: WriteBuffer::new(stg, lim.swbuf),
927 upd: WriteBuffer::new(upd, lim.uwbuf),
928 map: WMap::default(),
929 list: Vec::new(),
930 size,
931 });
932 result.init();
933 result
934 }
935
936 fn init(&mut self) {
938 let end = self.upd.stg.read_u64(0);
939 let size = self.upd.stg.read_u64(8);
940 if end == 0 {
941 return;
942 }
943 assert!(end == self.upd.stg.size());
944 let mut pos = 16;
945 while pos < end {
946 let start = self.upd.stg.read_u64(pos);
947 pos += 8;
948 let len = self.upd.stg.read_u64(pos);
949 pos += 8;
950 let mut buf = vec![0; len as usize];
951 self.upd.stg.read(pos, &mut buf);
952 pos += len;
953 self.stg.write(start, &buf);
954 }
955 self.stg.commit(size);
956 self.upd.commit(0);
957 }
958
959 pub fn commit_phase(&mut self, size: u64, phase: u8) {
961 if self.map.is_empty() && self.list.is_empty() {
962 return;
963 }
964 if phase == 1 {
965 self.list = self.map.convert_to_vec();
966
967 self.upd.write_u64(0, 0);
970 self.upd.write_u64(8, size);
971 self.upd.commit(16); let mut stg_written = false;
975 let mut pos: u64 = 16;
976 for (start, v) in self.list.iter() {
977 let (start, len, data) = (*start, v.len as u64, v.all());
978 if start >= self.size {
979 stg_written = true;
981 self.stg.write(start, data);
982 } else {
983 self.upd.write_u64(pos, start);
984 pos += 8;
985 self.upd.write_u64(pos, len);
986 pos += 8;
987 self.upd.write(pos, data);
988 pos += len;
989 }
990 }
991 if stg_written {
992 self.stg.commit(size);
993 }
994 self.upd.commit(pos); self.upd.write_u64(0, pos);
998 self.upd.write_u64(8, size);
999 self.upd.commit(pos);
1000 } else {
1001 for (start, v) in self.list.iter() {
1002 if *start < self.size {
1003 self.stg.write(*start, v.all());
1005 }
1006 }
1007 self.list.clear();
1008 self.stg.commit(size);
1009 self.upd.commit(0);
1010 }
1011 }
1012}
1013
1014impl BasicStorage for BasicAtomicFile {
1015 fn commit(&mut self, size: u64) {
1016 self.commit_phase(size, 1);
1017 self.commit_phase(size, 2);
1018 self.size = size;
1019 }
1020
1021 fn size(&self) -> u64 {
1022 self.size
1023 }
1024
1025 fn read(&self, start: u64, data: &mut [u8]) {
1026 self.map.read(start, data, &*self.stg.stg);
1027 }
1028
1029 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1030 self.map.write(start, data, off, len);
1031 }
1032
1033 fn write(&mut self, start: u64, data: &[u8]) {
1034 let len = data.len();
1035 let d = Arc::new(data.to_vec());
1036 self.write_data(start, d, 0, len);
1037 }
1038}
1039
1040#[cfg(target_family = "unix")]
1042pub struct UnixFileStorage {
1043 size: Arc<Mutex<u64>>,
1044 f: fs::File,
1045}
1046#[cfg(target_family = "unix")]
1047impl UnixFileStorage {
1048 pub fn new(filename: &str) -> Box<Self> {
1050 let mut f = OpenOptions::new()
1051 .read(true)
1052 .write(true)
1053 .create(true)
1054 .truncate(false)
1055 .open(filename)
1056 .unwrap();
1057 let size = f.seek(SeekFrom::End(0)).unwrap();
1058 let size = Arc::new(Mutex::new(size));
1059 Box::new(Self { size, f })
1060 }
1061}
1062
1063#[cfg(target_family = "unix")]
1064impl Storage for UnixFileStorage {
1065 fn clone(&self) -> Box<dyn Storage> {
1066 Box::new(Self {
1067 size: self.size.clone(),
1068 f: self.f.try_clone().unwrap(),
1069 })
1070 }
1071}
1072
1073#[cfg(target_family = "unix")]
1074use std::os::unix::fs::FileExt;
1075
1076#[cfg(target_family = "unix")]
1077impl BasicStorage for UnixFileStorage {
1078 fn read(&self, start: u64, data: &mut [u8]) {
1079 let _ = self.f.read_at(data, start);
1080 }
1081
1082 fn write(&mut self, start: u64, data: &[u8]) {
1083 let _ = self.f.write_at(data, start);
1084 }
1085
1086 fn size(&self) -> u64 {
1087 *self.size.lock().unwrap()
1088 }
1089
1090 fn commit(&mut self, size: u64) {
1091 *self.size.lock().unwrap() = size;
1092 self.f.set_len(size).unwrap();
1093 self.f.sync_all().unwrap();
1094 }
1095}
1096
1097#[cfg(target_family = "windows")]
1099pub struct WindowsFileStorage {
1100 size: Arc<Mutex<u64>>,
1101 f: fs::File,
1102}
1103#[cfg(target_family = "windows")]
1104impl WindowsFileStorage {
1105 pub fn new(filename: &str) -> Box<Self> {
1107 let mut f = OpenOptions::new()
1108 .read(true)
1109 .write(true)
1110 .create(true)
1111 .truncate(false)
1112 .open(filename)
1113 .unwrap();
1114 let size = f.seek(SeekFrom::End(0)).unwrap();
1115 let size = Arc::new(Mutex::new(size));
1116 Box::new(Self { size, f })
1117 }
1118}
1119
1120#[cfg(target_family = "windows")]
1121impl Storage for WindowsFileStorage {
1122 fn clone(&self) -> Box<dyn Storage> {
1123 Box::new(Self {
1124 size: self.size.clone(),
1125 f: self.f.try_clone().unwrap(),
1126 })
1127 }
1128}
1129
1130#[cfg(target_family = "windows")]
1131use std::os::windows::fs::FileExt;
1132
1133#[cfg(target_family = "windows")]
1134impl BasicStorage for WindowsFileStorage {
1135 fn read(&self, start: u64, data: &mut [u8]) {
1136 let _ = self.f.seek_read(data, start);
1137 }
1138
1139 fn write(&mut self, start: u64, data: &[u8]) {
1140 let _ = self.f.seek_write(data, start);
1141 }
1142
1143 fn size(&self) -> u64 {
1144 *self.size.lock().unwrap()
1145 }
1146
1147 fn commit(&mut self, size: u64) {
1148 *self.size.lock().unwrap() = size;
1149 self.f.set_len(size).unwrap();
1150 self.f.sync_all().unwrap();
1151 }
1152}
1153
1154#[cfg(target_family = "windows")]
1156pub type MultiFileStorage = WindowsFileStorage;
1157
1158#[cfg(target_family = "unix")]
1160pub type MultiFileStorage = UnixFileStorage;
1161
1162#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1164pub type MultiFileStorage = AnyFileStorage;
1165
1166#[cfg(any(target_family = "windows", target_family = "unix"))]
1168pub type FastFileStorage = MultiFileStorage;
1169
1170#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1172pub type FastFileStorage = UpdFileStorage;
1173
1174#[cfg(test)]
1175fn test_amount() -> usize {
1177 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1178}
1179
1180#[test]
1181fn test_atomic_file() {
1182 use rand::Rng;
1183 let ta = test_amount();
1186 println!(" Test amount={}", ta);
1187
1188 let mut rng = rand::thread_rng();
1189
1190 for _ in 0..100 {
1191 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1192 let mut s2 = MemFile::new();
1194
1195 for _ in 0..1000 * ta {
1196 let off: usize = rng.r#gen::<usize>() % 100;
1197 let mut len = 1 + rng.r#gen::<usize>() % 20;
1198 let w: bool = rng.r#gen();
1199 if w {
1200 let mut bytes = Vec::new();
1201 while len > 0 {
1202 len -= 1;
1203 let b: u8 = rng.r#gen::<u8>();
1204 bytes.push(b);
1205 }
1206 s1.write(off as u64, &bytes);
1207 s2.write(off as u64, &bytes);
1208 } else {
1209 let mut b2 = vec![0; len];
1210 let mut b3 = vec![0; len];
1211 s1.read(off as u64, &mut b2);
1212 s2.read(off as u64, &mut b3);
1213 assert!(b2 == b3);
1214 }
1215 if rng.r#gen::<usize>() % 50 == 0 {
1216 s1.commit(200);
1217 s2.commit(200);
1218 }
1219 }
1220 }
1221}