1#![deny(missing_docs)]
6
7use rustc_hash::FxHashMap as HashMap;
8use std::cell::Cell;
9use std::cmp::min;
10use std::sync::{Arc, Mutex, RwLock};
11
12#[cfg(feature = "pstd")]
13use pstd::collections::BTreeMap;
14#[cfg(not(feature = "pstd"))]
15use std::collections::BTreeMap;
16
17pub type Data = Arc<Vec<u8>>;
19
20pub struct AtomicFile {
37 map: WMap,
39 cf: Arc<RwLock<CommitFile>>,
41 size: u64,
43 tx: std::sync::mpsc::Sender<(u64, WMap)>,
45 busy: Arc<Mutex<()>>,
47 map_lim: usize,
49}
50
51impl AtomicFile {
52 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
54 Self::new_with_limits(stg, upd, &Limits::default())
55 }
56
57 pub fn new_with_limits(
59 stg: Box<dyn Storage>,
60 upd: Box<dyn BasicStorage>,
61 lim: &Limits,
62 ) -> Box<Self> {
63 let size = stg.size();
64 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
65
66 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
67 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
68 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
72
73 std::thread::spawn(move || {
74 while let Ok((size, map)) = rx.recv() {
76 let _lock = busy1.lock();
77 baf.map = map;
78 baf.commit(size);
79 cf1.write().unwrap().done_one();
80 }
81 });
82 Box::new(Self {
83 map: WMap::default(),
84 cf,
85 size,
86 tx,
87 busy,
88 map_lim: lim.map_lim,
89 })
90 }
91}
92
93impl Storage for AtomicFile {
94 fn clone(&self) -> Box<dyn Storage> {
95 panic!()
96 }
97}
98
99impl BasicStorage for AtomicFile {
100 fn commit(&mut self, size: u64) {
101 self.size = size;
102 if self.map.is_empty() {
103 return;
104 }
105 if self.cf.read().unwrap().map.len() > self.map_lim {
106 self.wait_complete();
107 }
108 let map = std::mem::take(&mut self.map);
109 let cf = &mut *self.cf.write().unwrap();
110 cf.todo += 1;
111 map.to_storage(cf);
113 self.tx.send((size, map)).unwrap();
115 }
116
117 fn size(&self) -> u64 {
118 self.size
119 }
120
121 fn read(&self, start: u64, data: &mut [u8]) {
122 self.map.read(start, data, &*self.cf.read().unwrap());
123 }
124
125 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
126 self.map.write(start, data, off, len);
127 }
128
129 fn write(&mut self, start: u64, data: &[u8]) {
130 let len = data.len();
131 let d = Arc::new(data.to_vec());
132 self.write_data(start, d, 0, len);
133 }
134
135 fn wait_complete(&self) {
136 while self.cf.read().unwrap().todo != 0 {
137 let _x = self.busy.lock();
138 }
139 }
140}
141
142struct CommitFile {
143 stg: ReadBufStg<256>,
145 map: WMap,
147 todo: usize,
149}
150
151impl CommitFile {
152 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
153 Self {
154 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
155 map: WMap::default(),
156 todo: 0,
157 }
158 }
159
160 fn done_one(&mut self) {
161 self.todo -= 1;
162 if self.todo == 0 {
163 self.map = WMap::default();
164 self.stg.reset();
165 }
166 }
167}
168
169impl BasicStorage for CommitFile {
170 fn commit(&mut self, _size: u64) {
171 panic!()
172 }
173
174 fn size(&self) -> u64 {
175 panic!()
176 }
177
178 fn read(&self, start: u64, data: &mut [u8]) {
179 self.map.read(start, data, &self.stg);
180 }
181
182 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
183 self.map.write(start, data, off, len);
184 }
185
186 fn write(&mut self, _start: u64, _data: &[u8]) {
187 panic!()
188 }
189}
190
191pub trait BasicStorage: Send {
195 fn size(&self) -> u64;
198
199 fn read(&self, start: u64, data: &mut [u8]);
201
202 fn write(&mut self, start: u64, data: &[u8]);
204
205 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
207 let len = data.len();
208 let d = Arc::new(data);
209 self.write_data(start, d, 0, len);
210 }
211
212 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
214 self.write(start, &data[off..off + len]);
215 }
216
217 fn commit(&mut self, size: u64);
219
220 fn write_u64(&mut self, start: u64, value: u64) {
222 self.write(start, &value.to_le_bytes());
223 }
224
225 fn read_u64(&self, start: u64) -> u64 {
227 let mut bytes = [0; 8];
228 self.read(start, &mut bytes);
229 u64::from_le_bytes(bytes)
230 }
231
232 fn wait_complete(&self) {}
234}
235
236pub trait Storage: BasicStorage + Sync {
238 fn clone(&self) -> Box<dyn Storage>;
240}
241
242#[derive(Default)]
244pub struct MemFile {
245 v: Arc<Mutex<Vec<u8>>>,
246}
247
248impl MemFile {
249 pub fn new() -> Box<Self> {
251 Box::default()
252 }
253}
254
255impl Storage for MemFile {
256 fn clone(&self) -> Box<dyn Storage> {
257 Box::new(Self { v: self.v.clone() })
258 }
259}
260
261impl BasicStorage for MemFile {
262 fn size(&self) -> u64 {
263 let v = self.v.lock().unwrap();
264 v.len() as u64
265 }
266
267 fn read(&self, off: u64, bytes: &mut [u8]) {
268 let off = off as usize;
269 let len = bytes.len();
270 let mut v = self.v.lock().unwrap();
271 if off + len > v.len() {
272 v.resize(off + len, 0);
273 }
274 bytes.copy_from_slice(&v[off..off + len]);
275 }
276
277 fn write(&mut self, off: u64, bytes: &[u8]) {
278 let off = off as usize;
279 let len = bytes.len();
280 let mut v = self.v.lock().unwrap();
281 if off + len > v.len() {
282 v.resize(off + len, 0);
283 }
284 v[off..off + len].copy_from_slice(bytes);
285 }
286
287 fn commit(&mut self, size: u64) {
288 let mut v = self.v.lock().unwrap();
289 v.resize(size as usize, 0);
290 }
291}
292
293use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
294
295struct FileInner {
296 f: fs::File,
297}
298
299impl FileInner {
300 pub fn new(filename: &str) -> Self {
302 Self {
303 f: OpenOptions::new()
304 .read(true)
305 .write(true)
306 .create(true)
307 .truncate(false)
308 .open(filename)
309 .unwrap(),
310 }
311 }
312
313 fn size(&mut self) -> u64 {
314 self.f.seek(SeekFrom::End(0)).unwrap()
315 }
316
317 fn read(&mut self, off: u64, bytes: &mut [u8]) {
318 self.f.seek(SeekFrom::Start(off)).unwrap();
319 let _ = self.f.read(bytes).unwrap();
320 }
321
322 fn write(&mut self, off: u64, bytes: &[u8]) {
323 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
325 {
326 let size = self.f.seek(SeekFrom::End(0)).unwrap();
327 if off > size {
328 self.f.set_len(off).unwrap();
329 }
330 }
331 self.f.seek(SeekFrom::Start(off)).unwrap();
332 let _ = self.f.write(bytes).unwrap();
333 }
334
335 fn commit(&mut self, size: u64) {
336 self.f.set_len(size).unwrap();
337 self.f.sync_all().unwrap();
338 }
339}
340
341pub struct UpdFileStorage {
343 file: Cell<Option<FileInner>>,
344}
345
346impl UpdFileStorage {
347 pub fn new(filename: &str) -> Box<Self> {
349 Box::new(Self {
350 file: Cell::new(Some(FileInner::new(filename))),
351 })
352 }
353}
354
355impl BasicStorage for UpdFileStorage {
356 fn size(&self) -> u64 {
357 let mut f = self.file.take().unwrap();
358 let result = f.size();
359 self.file.set(Some(f));
360 result
361 }
362 fn read(&self, off: u64, bytes: &mut [u8]) {
363 let mut f = self.file.take().unwrap();
364 f.read(off, bytes);
365 self.file.set(Some(f));
366 }
367
368 fn write(&mut self, off: u64, bytes: &[u8]) {
369 let mut f = self.file.take().unwrap();
370 f.write(off, bytes);
371 self.file.set(Some(f));
372 }
373
374 fn commit(&mut self, size: u64) {
375 let mut f = self.file.take().unwrap();
376 f.commit(size);
377 self.file.set(Some(f));
378 }
379}
380
381pub struct SimpleFileStorage {
383 file: Arc<Mutex<FileInner>>,
384}
385
386impl SimpleFileStorage {
387 pub fn new(filename: &str) -> Box<Self> {
389 Box::new(Self {
390 file: Arc::new(Mutex::new(FileInner::new(filename))),
391 })
392 }
393}
394
395impl Storage for SimpleFileStorage {
396 fn clone(&self) -> Box<dyn Storage> {
397 Box::new(Self {
398 file: self.file.clone(),
399 })
400 }
401}
402
403impl BasicStorage for SimpleFileStorage {
404 fn size(&self) -> u64 {
405 self.file.lock().unwrap().size()
406 }
407
408 fn read(&self, off: u64, bytes: &mut [u8]) {
409 self.file.lock().unwrap().read(off, bytes);
410 }
411
412 fn write(&mut self, off: u64, bytes: &[u8]) {
413 self.file.lock().unwrap().write(off, bytes);
414 }
415
416 fn commit(&mut self, size: u64) {
417 self.file.lock().unwrap().commit(size);
418 }
419}
420
421pub struct AnyFileStorage {
423 filename: String,
424 files: Arc<Mutex<Vec<FileInner>>>,
425}
426
427impl AnyFileStorage {
428 pub fn new(filename: &str) -> Box<Self> {
430 Box::new(Self {
431 filename: filename.to_owned(),
432 files: Arc::new(Mutex::new(Vec::new())),
433 })
434 }
435
436 fn get_file(&self) -> FileInner {
437 match self.files.lock().unwrap().pop() {
438 Some(f) => f,
439 _ => FileInner::new(&self.filename),
440 }
441 }
442
443 fn put_file(&self, f: FileInner) {
444 self.files.lock().unwrap().push(f);
445 }
446}
447
448impl Storage for AnyFileStorage {
449 fn clone(&self) -> Box<dyn Storage> {
450 Box::new(Self {
451 filename: self.filename.clone(),
452 files: self.files.clone(),
453 })
454 }
455}
456
457impl BasicStorage for AnyFileStorage {
458 fn size(&self) -> u64 {
459 let mut f = self.get_file();
460 let result = f.size();
461 self.put_file(f);
462 result
463 }
464
465 fn read(&self, off: u64, bytes: &mut [u8]) {
466 let mut f = self.get_file();
467 f.read(off, bytes);
468 self.put_file(f);
469 }
470
471 fn write(&mut self, off: u64, bytes: &[u8]) {
472 let mut f = self.get_file();
473 f.write(off, bytes);
474 self.put_file(f);
475 }
476
477 fn commit(&mut self, size: u64) {
478 let mut f = self.get_file();
479 f.commit(size);
480 self.put_file(f);
481 }
482}
483
484pub struct DummyFile {}
486impl DummyFile {
487 pub fn new() -> Box<Self> {
489 Box::new(Self {})
490 }
491}
492
493impl Storage for DummyFile {
494 fn clone(&self) -> Box<dyn Storage> {
495 Self::new()
496 }
497}
498
499impl BasicStorage for DummyFile {
500 fn size(&self) -> u64 {
501 0
502 }
503
504 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
505
506 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
507
508 fn commit(&mut self, _size: u64) {}
509}
510
511#[non_exhaustive]
513pub struct Limits {
514 pub map_lim: usize,
516 pub rbuf_mem: usize,
518 pub swbuf: usize,
520 pub uwbuf: usize,
522}
523
524impl Default for Limits {
525 fn default() -> Self {
526 Self {
527 map_lim: 5000,
528 rbuf_mem: 0x200000,
529 swbuf: 0x100000,
530 uwbuf: 0x100000,
531 }
532 }
533}
534
535struct WriteBuffer {
537 ix: usize,
539 pos: u64,
541 pub stg: Box<dyn BasicStorage>,
543 buf: Vec<u8>,
545}
546
547impl WriteBuffer {
548 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
550 Self {
551 ix: 0,
552 pos: u64::MAX,
553 stg,
554 buf: vec![0; buf_size],
555 }
556 }
557
558 pub fn write(&mut self, off: u64, data: &[u8]) {
560 if self.pos + self.ix as u64 != off {
561 self.flush(off);
562 }
563 let mut done: usize = 0;
564 let mut todo: usize = data.len();
565 while todo > 0 {
566 let mut n: usize = self.buf.len() - self.ix;
567 if n == 0 {
568 self.flush(off + done as u64);
569 n = self.buf.len();
570 }
571 if n > todo {
572 n = todo;
573 }
574 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
575 todo -= n;
576 done += n;
577 self.ix += n;
578 }
579 }
580
581 fn flush(&mut self, new_pos: u64) {
582 if self.ix > 0 {
583 self.stg.write(self.pos, &self.buf[0..self.ix]);
584 }
585 self.ix = 0;
586 self.pos = new_pos;
587 }
588
589 pub fn commit(&mut self, size: u64) {
591 self.flush(u64::MAX);
592 self.stg.commit(size);
593 }
594
595 pub fn write_u64(&mut self, start: u64, value: u64) {
597 self.write(start, &value.to_le_bytes());
598 }
599}
600
601struct ReadBufStg<const N: usize> {
607 stg: Box<dyn Storage>,
609 buf: Mutex<ReadBuffer<N>>,
611 limit: usize,
613}
614
615impl<const N: usize> Drop for ReadBufStg<N> {
616 fn drop(&mut self) {
617 self.reset();
618 }
619}
620
621impl<const N: usize> ReadBufStg<N> {
622 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
624 Self {
625 stg,
626 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
627 limit,
628 }
629 }
630
631 fn reset(&mut self) {
633 self.buf.lock().unwrap().reset();
634 }
635}
636
637impl<const N: usize> BasicStorage for ReadBufStg<N> {
638 fn read(&self, start: u64, data: &mut [u8]) {
640 if data.len() <= self.limit {
641 self.buf.lock().unwrap().read(&*self.stg, start, data);
642 } else {
643 self.stg.read(start, data);
644 }
645 }
646
647 fn size(&self) -> u64 {
649 panic!()
650 }
651
652 fn write(&mut self, _start: u64, _data: &[u8]) {
654 panic!();
655 }
656
657 fn commit(&mut self, _size: u64) {
659 panic!();
660 }
661}
662
663struct ReadBuffer<const N: usize> {
664 map: HashMap<u64, Box<[u8; N]>>,
666 max_buf: usize,
668}
669
670impl<const N: usize> ReadBuffer<N> {
671 fn new(max_buf: usize) -> Self {
672 Self {
673 map: HashMap::default(),
674 max_buf,
675 }
676 }
677
678 fn reset(&mut self) {
679 self.map.clear();
680 }
681
682 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
683 let mut done = 0;
684 while done < data.len() {
685 let off = off + done as u64;
686 let sector = off / N as u64;
687 let disp = (off % N as u64) as usize;
688 let amount = min(data.len() - done, N - disp);
689
690 let p = self.map.entry(sector).or_insert_with(|| {
691 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
692 stg.read(sector * N as u64, &mut *p);
693 p
694 });
695 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
696 done += amount;
697 }
698 if self.map.len() >= self.max_buf {
699 self.reset();
700 }
701 }
702}
703
704#[derive(Default)]
705struct DataSlice {
707 pub data: Data,
709 pub off: usize,
711 pub len: usize,
713}
714
715impl DataSlice {
716 pub fn all(&self) -> &[u8] {
718 &self.data[self.off..self.off + self.len]
719 }
720 pub fn part(&self, off: usize, len: usize) -> &[u8] {
722 &self.data[self.off + off..self.off + off + len]
723 }
724 pub fn trim(&mut self, trim: usize) {
726 self.off += trim;
727 self.len -= trim;
728 }
729 #[allow(dead_code)]
731 pub fn take(&mut self) -> Data {
732 std::mem::take(&mut self.data)
733 }
734}
735
736#[derive(Default)]
737struct WMap {
739 map: BTreeMap<u64, DataSlice>,
741}
742
743impl WMap {
744 pub fn is_empty(&self) -> bool {
746 self.map.is_empty()
747 }
748
749 pub fn len(&self) -> usize {
751 self.map.len()
752 }
753
754 pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
756 let map = std::mem::take(&mut self.map);
757 let mut result = Vec::with_capacity(map.len());
758 for (end, v) in map {
759 let start = end - v.len as u64;
760 result.push((start, v));
761 }
762 result
763 }
764
765 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
767 for (end, v) in self.map.iter() {
768 let start = end - v.len as u64;
769 stg.write_data(start, v.data.clone(), v.off, v.len);
770 }
771 }
772
773 #[cfg(not(feature = "pstd"))]
774 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
776 if len != 0 {
777 let (mut insert, mut remove) = (Vec::new(), Vec::new());
778 let end = start + len as u64;
779 for (ee, v) in self.map.range_mut(start + 1..) {
780 let ee = *ee;
781 let es = ee - v.len as u64; if es >= end {
783 break;
785 } else if start <= es {
786 if end < ee {
787 v.trim((end - es) as usize);
789 break;
790 }
791 remove.push(ee);
793 } else if end < ee {
794 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
797 v.trim((end - es) as usize);
798 break;
799 } else {
800 insert.push((es, v.take(), v.off, (start - es) as usize));
803 remove.push(ee);
804 }
805 }
806 for end in remove {
807 self.map.remove(&end);
808 }
809 for (start, data, off, len) in insert {
810 self.map
811 .insert(start + len as u64, DataSlice { data, off, len });
812 }
813 self.map
814 .insert(start + len as u64, DataSlice { data, off, len });
815 }
816 }
817
818 #[cfg(feature = "pstd")]
819 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
821 if len != 0 {
822 let end = start + len as u64;
823 let mut c = self
824 .map
825 .lower_bound_mut(std::ops::Bound::Excluded(&start))
826 .with_mutable_key();
827 while let Some((eend, v)) = c.next() {
828 let ee = *eend;
829 let es = ee - v.len as u64; if es >= end {
831 c.prev();
833 break;
834 } else if start <= es {
835 if end < ee {
836 v.trim((end - es) as usize);
838 c.prev();
839 break;
840 }
841 c.remove_prev();
843 } else if end < ee {
844 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
847 v.trim((end - es) as usize);
848 c.prev();
849 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
850 break;
851 } else {
852 v.len = (start - es) as usize;
855 *eend = es + v.len as u64;
856 }
857 }
858 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
860 }
861 }
862
863 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
865 let len = data.len();
866 if len != 0 {
867 let mut done = 0;
868 for (&end, v) in self.map.range(start + 1..) {
869 let es = end - v.len as u64; let doff = start + done as u64;
871 if es > doff {
872 let a = min(len - done, (es - doff) as usize);
874 u.read(doff, &mut data[done..done + a]);
875 done += a;
876 if done == len {
877 return;
878 }
879 }
880 let skip = (start + done as u64 - es) as usize;
882 let a = min(len - done, v.len - skip);
883 data[done..done + a].copy_from_slice(v.part(skip, a));
884 done += a;
885 if done == len {
886 return;
887 }
888 }
889 u.read(start + done as u64, &mut data[done..]);
890 }
891 }
892}
893
894pub struct BasicAtomicFile {
896 stg: WriteBuffer,
898 upd: WriteBuffer,
900 map: WMap,
902 list: Vec<(u64, DataSlice)>,
904 size: u64,
905}
906
907impl BasicAtomicFile {
908 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
910 let size = stg.size();
911 let mut result = Box::new(Self {
912 stg: WriteBuffer::new(stg, lim.swbuf),
913 upd: WriteBuffer::new(upd, lim.uwbuf),
914 map: WMap::default(),
915 list: Vec::new(),
916 size,
917 });
918 result.init();
919 result
920 }
921
922 fn init(&mut self) {
924 let end = self.upd.stg.read_u64(0);
925 let size = self.upd.stg.read_u64(8);
926 if end == 0 {
927 return;
928 }
929 assert!(end == self.upd.stg.size());
930 let mut pos = 16;
931 while pos < end {
932 let start = self.upd.stg.read_u64(pos);
933 pos += 8;
934 let len = self.upd.stg.read_u64(pos);
935 pos += 8;
936 let mut buf = vec![0; len as usize];
937 self.upd.stg.read(pos, &mut buf);
938 pos += len;
939 self.stg.write(start, &buf);
940 }
941 self.stg.commit(size);
942 self.upd.commit(0);
943 }
944
945 pub fn commit_phase(&mut self, size: u64, phase: u8) {
947 if self.map.is_empty() && self.list.is_empty() {
948 return;
949 }
950 if phase == 1 {
951 self.list = self.map.convert_to_vec();
952
953 self.upd.write_u64(0, 0);
956 self.upd.write_u64(8, size);
957 self.upd.commit(16); let mut stg_written = false;
961 let mut pos: u64 = 16;
962 for (start, v) in self.list.iter() {
963 let (start, len, data) = (*start, v.len as u64, v.all());
964 if start >= self.size {
965 stg_written = true;
967 self.stg.write(start, data);
968 } else {
969 self.upd.write_u64(pos, start);
970 pos += 8;
971 self.upd.write_u64(pos, len);
972 pos += 8;
973 self.upd.write(pos, data);
974 pos += len;
975 }
976 }
977 if stg_written {
978 self.stg.commit(size);
979 }
980 self.upd.commit(pos); self.upd.write_u64(0, pos);
984 self.upd.write_u64(8, size);
985 self.upd.commit(pos);
986 } else {
987 for (start, v) in self.list.iter() {
988 if *start < self.size {
989 self.stg.write(*start, v.all());
991 }
992 }
993 self.list.clear();
994 self.stg.commit(size);
995 self.upd.commit(0);
996 }
997 }
998}
999
1000impl BasicStorage for BasicAtomicFile {
1001 fn commit(&mut self, size: u64) {
1002 self.commit_phase(size, 1);
1003 self.commit_phase(size, 2);
1004 self.size = size;
1005 }
1006
1007 fn size(&self) -> u64 {
1008 self.size
1009 }
1010
1011 fn read(&self, start: u64, data: &mut [u8]) {
1012 self.map.read(start, data, &*self.stg.stg);
1013 }
1014
1015 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1016 self.map.write(start, data, off, len);
1017 }
1018
1019 fn write(&mut self, start: u64, data: &[u8]) {
1020 let len = data.len();
1021 let d = Arc::new(data.to_vec());
1022 self.write_data(start, d, 0, len);
1023 }
1024}
1025
1026#[cfg(target_family = "unix")]
1028pub struct UnixFileStorage {
1029 size: Arc<Mutex<u64>>,
1030 f: fs::File,
1031}
1032#[cfg(target_family = "unix")]
1033impl UnixFileStorage {
1034 pub fn new(filename: &str) -> Box<Self> {
1036 let mut f = OpenOptions::new()
1037 .read(true)
1038 .write(true)
1039 .create(true)
1040 .truncate(false)
1041 .open(filename)
1042 .unwrap();
1043 let size = f.seek(SeekFrom::End(0)).unwrap();
1044 let size = Arc::new(Mutex::new(size));
1045 Box::new(Self { size, f })
1046 }
1047}
1048
1049#[cfg(target_family = "unix")]
1050impl Storage for UnixFileStorage {
1051 fn clone(&self) -> Box<dyn Storage> {
1052 Box::new(Self {
1053 size: self.size.clone(),
1054 f: self.f.try_clone().unwrap(),
1055 })
1056 }
1057}
1058
1059#[cfg(target_family = "unix")]
1060use std::os::unix::fs::FileExt;
1061
1062#[cfg(target_family = "unix")]
1063impl BasicStorage for UnixFileStorage {
1064 fn read(&self, start: u64, data: &mut [u8]) {
1065 let _ = self.f.read_at(data, start);
1066 }
1067
1068 fn write(&mut self, start: u64, data: &[u8]) {
1069 let _ = self.f.write_at(data, start);
1070 }
1071
1072 fn size(&self) -> u64 {
1073 *self.size.lock().unwrap()
1074 }
1075
1076 fn commit(&mut self, size: u64) {
1077 *self.size.lock().unwrap() = size;
1078 self.f.set_len(size).unwrap();
1079 self.f.sync_all().unwrap();
1080 }
1081}
1082
1083#[cfg(target_family = "windows")]
1085pub struct WindowsFileStorage {
1086 size: Arc<Mutex<u64>>,
1087 f: fs::File,
1088}
1089#[cfg(target_family = "windows")]
1090impl WindowsFileStorage {
1091 pub fn new(filename: &str) -> Box<Self> {
1093 let mut f = OpenOptions::new()
1094 .read(true)
1095 .write(true)
1096 .create(true)
1097 .truncate(false)
1098 .open(filename)
1099 .unwrap();
1100 let size = f.seek(SeekFrom::End(0)).unwrap();
1101 let size = Arc::new(Mutex::new(size));
1102 Box::new(Self { size, f })
1103 }
1104}
1105
1106#[cfg(target_family = "windows")]
1107impl Storage for WindowsFileStorage {
1108 fn clone(&self) -> Box<dyn Storage> {
1109 Box::new(Self {
1110 size: self.size.clone(),
1111 f: self.f.try_clone().unwrap(),
1112 })
1113 }
1114}
1115
1116#[cfg(target_family = "windows")]
1117use std::os::windows::fs::FileExt;
1118
1119#[cfg(target_family = "windows")]
1120impl BasicStorage for WindowsFileStorage {
1121 fn read(&self, start: u64, data: &mut [u8]) {
1122 let _ = self.f.seek_read(data, start);
1123 }
1124
1125 fn write(&mut self, start: u64, data: &[u8]) {
1126 let _ = self.f.seek_write(data, start);
1127 }
1128
1129 fn size(&self) -> u64 {
1130 *self.size.lock().unwrap()
1131 }
1132
1133 fn commit(&mut self, size: u64) {
1134 *self.size.lock().unwrap() = size;
1135 self.f.set_len(size).unwrap();
1136 self.f.sync_all().unwrap();
1137 }
1138}
1139
1140#[cfg(target_family = "windows")]
1142pub type MultiFileStorage = WindowsFileStorage;
1143
1144#[cfg(target_family = "unix")]
1146pub type MultiFileStorage = UnixFileStorage;
1147
1148#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1150pub type MultiFileStorage = AnyFileStorage;
1151
1152#[cfg(any(target_family = "windows", target_family = "unix"))]
1154pub type FastFileStorage = MultiFileStorage;
1155
1156#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1158pub type FastFileStorage = UpdFileStorage;
1159
1160#[cfg(test)]
1161fn test_amount() -> usize {
1163 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1164}
1165
1166#[test]
1167fn test_atomic_file() {
1168 use rand::Rng;
1169 let ta = test_amount();
1172 println!(" Test amount={}", ta);
1173
1174 let mut rng = rand::thread_rng();
1175
1176 for _ in 0..100 {
1177 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1178 let mut s2 = MemFile::new();
1180
1181 for _ in 0..1000 * ta {
1182 let off: usize = rng.r#gen::<usize>() % 100;
1183 let mut len = 1 + rng.r#gen::<usize>() % 20;
1184 let w: bool = rng.r#gen();
1185 if w {
1186 let mut bytes = Vec::new();
1187 while len > 0 {
1188 len -= 1;
1189 let b: u8 = rng.r#gen::<u8>();
1190 bytes.push(b);
1191 }
1192 s1.write(off as u64, &bytes);
1193 s2.write(off as u64, &bytes);
1194 } else {
1195 let mut b2 = vec![0; len];
1196 let mut b3 = vec![0; len];
1197 s1.read(off as u64, &mut b2);
1198 s2.read(off as u64, &mut b3);
1199 assert!(b2 == b3);
1200 }
1201 if rng.r#gen::<usize>() % 50 == 0 {
1202 s1.commit(200);
1203 s2.commit(200);
1204 }
1205 }
1206 }
1207}