1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::GTemp,
27 veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39pub type Data = Arc<Vec<u8>>;
41
42pub struct AtomicFile {
59 map: WMap,
61 cf: Arc<RwLock<CommitFile>>,
63 size: u64,
65 tx: std::sync::mpsc::Sender<(u64, WMap)>,
67 busy: Arc<Mutex<()>>,
69 map_lim: usize,
71}
72
73impl AtomicFile {
74 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76 Self::new_with_limits(stg, upd, &Limits::default())
77 }
78
79 pub fn new_with_limits(
81 stg: Box<dyn Storage>,
82 upd: Box<dyn BasicStorage>,
83 lim: &Limits,
84 ) -> Box<Self> {
85 let size = stg.size();
86 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
94
95 std::thread::spawn(move || {
96 while let Ok((size, map)) = rx.recv() {
98 let _lock = busy1.lock();
99 baf.map = map;
100 baf.commit(size);
101 cf1.write().unwrap().done_one();
102 }
103 });
104 Box::new(Self {
105 map: WMap::default(),
106 cf,
107 size,
108 tx,
109 busy,
110 map_lim: lim.map_lim,
111 })
112 }
113}
114
115impl Storage for AtomicFile {
116 fn clone(&self) -> Box<dyn Storage> {
117 panic!()
118 }
119}
120
121impl BasicStorage for AtomicFile {
122 fn commit(&mut self, size: u64) {
123 self.size = size;
124 if self.map.is_empty() {
125 return;
126 }
127 if self.cf.read().unwrap().map.len() > self.map_lim {
128 self.wait_complete();
129 }
130 let map = std::mem::take(&mut self.map);
131 let cf = &mut *self.cf.write().unwrap();
132 if cf.stop {
133 return;
134 } cf.todo += 1;
136 map.to_storage(cf);
138 self.tx.send((size, map)).unwrap();
140 }
141
142 fn size(&self) -> u64 {
143 self.size
144 }
145
146 fn read(&self, start: u64, data: &mut [u8]) {
147 self.map.read(start, data, &*self.cf.read().unwrap());
148 }
149
150 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
151 self.map.write(start, data, off, len);
152 }
153
154 fn write(&mut self, start: u64, data: &[u8]) {
155 let len = data.len();
156 let d = Arc::new(data.to_vec());
157 self.write_data(start, d, 0, len);
158 }
159
160 fn wait_complete(&self) {
161 while self.cf.read().unwrap().busy() {
162 let _x = self.busy.lock();
163 }
164 }
165
166 fn shutdown(&mut self) {
167 self.cf.write().unwrap().stop = true; while self.cf.read().unwrap().todo != 0 {
169 let _x = self.busy.lock();
170 }
171 }
172}
173
174struct CommitFile {
175 stg: ReadBufStg<256>,
177 map: WMap,
179 todo: usize,
181 stop: bool,
183}
184
185impl CommitFile {
186 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
187 Self {
188 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
189 map: WMap::default(),
190 todo: 0,
191 stop: false,
192 }
193 }
194
195 fn done_one(&mut self) {
196 self.todo -= 1;
197 if self.todo == 0 {
198 self.map = WMap::default();
199 self.stg.reset();
200 }
201 }
202
203 fn busy(&self) -> bool {
204 self.todo != 0 || self.stop
205 }
206}
207
208impl BasicStorage for CommitFile {
209 fn commit(&mut self, _size: u64) {
210 panic!()
211 }
212
213 fn size(&self) -> u64 {
214 panic!()
215 }
216
217 fn read(&self, start: u64, data: &mut [u8]) {
218 self.map.read(start, data, &self.stg);
219 }
220
221 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
222 self.map.write(start, data, off, len);
223 }
224
225 fn write(&mut self, _start: u64, _data: &[u8]) {
226 panic!()
227 }
228}
229
230pub trait BasicStorage: Send {
234 fn size(&self) -> u64;
237
238 fn read(&self, start: u64, data: &mut [u8]);
240
241 fn write(&mut self, start: u64, data: &[u8]);
243
244 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
246 let len = data.len();
247 let d = Arc::new(data);
248 self.write_data(start, d, 0, len);
249 }
250
251 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
253 self.write(start, &data[off..off + len]);
254 }
255
256 fn commit(&mut self, size: u64);
258
259 fn write_u64(&mut self, start: u64, value: u64) {
261 self.write(start, &value.to_le_bytes());
262 }
263
264 fn read_u64(&self, start: u64) -> u64 {
266 let mut bytes = [0; 8];
267 self.read(start, &mut bytes);
268 u64::from_le_bytes(bytes)
269 }
270
271 fn wait_complete(&self) {}
273
274 fn shutdown(&mut self) {}
276}
277
278pub trait Storage: BasicStorage + Sync {
280 fn clone(&self) -> Box<dyn Storage>;
282}
283
284#[derive(Default)]
286pub struct MemFile {
287 v: Arc<Mutex<Vec<u8>>>,
288}
289
290impl MemFile {
291 pub fn new() -> Box<Self> {
293 Box::default()
294 }
295}
296
297impl Storage for MemFile {
298 fn clone(&self) -> Box<dyn Storage> {
299 Box::new(Self { v: self.v.clone() })
300 }
301}
302
303impl BasicStorage for MemFile {
304 fn size(&self) -> u64 {
305 let v = self.v.lock().unwrap();
306 v.len() as u64
307 }
308
309 fn read(&self, off: u64, bytes: &mut [u8]) {
310 let off = off as usize;
311 let len = bytes.len();
312 let mut v = self.v.lock().unwrap();
313 if off + len > v.len() {
314 v.resize(off + len, 0);
315 }
316 bytes.copy_from_slice(&v[off..off + len]);
317 }
318
319 fn write(&mut self, off: u64, bytes: &[u8]) {
320 let off = off as usize;
321 let len = bytes.len();
322 let mut v = self.v.lock().unwrap();
323 if off + len > v.len() {
324 v.resize(off + len, 0);
325 }
326 v[off..off + len].copy_from_slice(bytes);
327 }
328
329 fn commit(&mut self, size: u64) {
330 let mut v = self.v.lock().unwrap();
331 v.resize(size as usize, 0);
332 }
333}
334
335use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
336
337struct FileInner {
338 f: fs::File,
339}
340
341impl FileInner {
342 pub fn new(filename: &str) -> Self {
344 Self {
345 f: OpenOptions::new()
346 .read(true)
347 .write(true)
348 .create(true)
349 .truncate(false)
350 .open(filename)
351 .unwrap(),
352 }
353 }
354
355 fn size(&mut self) -> u64 {
356 self.f.seek(SeekFrom::End(0)).unwrap()
357 }
358
359 fn read(&mut self, off: u64, bytes: &mut [u8]) {
360 self.f.seek(SeekFrom::Start(off)).unwrap();
361 let _ = self.f.read(bytes).unwrap();
362 }
363
364 fn write(&mut self, off: u64, bytes: &[u8]) {
365 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
367 {
368 let size = self.f.seek(SeekFrom::End(0)).unwrap();
369 if off > size {
370 self.f.set_len(off).unwrap();
371 }
372 }
373 self.f.seek(SeekFrom::Start(off)).unwrap();
374 let _ = self.f.write(bytes).unwrap();
375 }
376
377 fn commit(&mut self, size: u64) {
378 self.f.set_len(size).unwrap();
379 self.f.sync_all().unwrap();
380 }
381}
382
383pub struct UpdFileStorage {
385 file: Cell<Option<FileInner>>,
386}
387
388impl UpdFileStorage {
389 pub fn new(filename: &str) -> Box<Self> {
391 Box::new(Self {
392 file: Cell::new(Some(FileInner::new(filename))),
393 })
394 }
395}
396
397impl BasicStorage for UpdFileStorage {
398 fn size(&self) -> u64 {
399 let mut f = self.file.take().unwrap();
400 let result = f.size();
401 self.file.set(Some(f));
402 result
403 }
404 fn read(&self, off: u64, bytes: &mut [u8]) {
405 let mut f = self.file.take().unwrap();
406 f.read(off, bytes);
407 self.file.set(Some(f));
408 }
409
410 fn write(&mut self, off: u64, bytes: &[u8]) {
411 let mut f = self.file.take().unwrap();
412 f.write(off, bytes);
413 self.file.set(Some(f));
414 }
415
416 fn commit(&mut self, size: u64) {
417 let mut f = self.file.take().unwrap();
418 f.commit(size);
419 self.file.set(Some(f));
420 }
421}
422
423pub struct SimpleFileStorage {
425 file: Arc<Mutex<FileInner>>,
426}
427
428impl SimpleFileStorage {
429 pub fn new(filename: &str) -> Box<Self> {
431 Box::new(Self {
432 file: Arc::new(Mutex::new(FileInner::new(filename))),
433 })
434 }
435}
436
437impl Storage for SimpleFileStorage {
438 fn clone(&self) -> Box<dyn Storage> {
439 Box::new(Self {
440 file: self.file.clone(),
441 })
442 }
443}
444
445impl BasicStorage for SimpleFileStorage {
446 fn size(&self) -> u64 {
447 self.file.lock().unwrap().size()
448 }
449
450 fn read(&self, off: u64, bytes: &mut [u8]) {
451 self.file.lock().unwrap().read(off, bytes);
452 }
453
454 fn write(&mut self, off: u64, bytes: &[u8]) {
455 self.file.lock().unwrap().write(off, bytes);
456 }
457
458 fn commit(&mut self, size: u64) {
459 self.file.lock().unwrap().commit(size);
460 }
461}
462
463pub struct AnyFileStorage {
465 filename: String,
466 files: Arc<Mutex<Vec<FileInner>>>,
467}
468
469impl AnyFileStorage {
470 pub fn new(filename: &str) -> Box<Self> {
472 Box::new(Self {
473 filename: filename.to_owned(),
474 files: Arc::new(Mutex::new(Vec::new())),
475 })
476 }
477
478 fn get_file(&self) -> FileInner {
479 match self.files.lock().unwrap().pop() {
480 Some(f) => f,
481 _ => FileInner::new(&self.filename),
482 }
483 }
484
485 fn put_file(&self, f: FileInner) {
486 self.files.lock().unwrap().push(f);
487 }
488}
489
490impl Storage for AnyFileStorage {
491 fn clone(&self) -> Box<dyn Storage> {
492 Box::new(Self {
493 filename: self.filename.clone(),
494 files: self.files.clone(),
495 })
496 }
497}
498
499impl BasicStorage for AnyFileStorage {
500 fn size(&self) -> u64 {
501 let mut f = self.get_file();
502 let result = f.size();
503 self.put_file(f);
504 result
505 }
506
507 fn read(&self, off: u64, bytes: &mut [u8]) {
508 let mut f = self.get_file();
509 f.read(off, bytes);
510 self.put_file(f);
511 }
512
513 fn write(&mut self, off: u64, bytes: &[u8]) {
514 let mut f = self.get_file();
515 f.write(off, bytes);
516 self.put_file(f);
517 }
518
519 fn commit(&mut self, size: u64) {
520 let mut f = self.get_file();
521 f.commit(size);
522 self.put_file(f);
523 }
524}
525
526pub struct DummyFile {}
528impl DummyFile {
529 pub fn new() -> Box<Self> {
531 Box::new(Self {})
532 }
533}
534
535impl Storage for DummyFile {
536 fn clone(&self) -> Box<dyn Storage> {
537 Self::new()
538 }
539}
540
541impl BasicStorage for DummyFile {
542 fn size(&self) -> u64 {
543 0
544 }
545
546 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
547
548 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
549
550 fn commit(&mut self, _size: u64) {}
551}
552
553#[non_exhaustive]
555pub struct Limits {
556 pub map_lim: usize,
558 pub rbuf_mem: usize,
560 pub swbuf: usize,
562 pub uwbuf: usize,
564}
565
566impl Default for Limits {
567 fn default() -> Self {
568 Self {
569 map_lim: 5000,
570 rbuf_mem: 0x200000,
571 swbuf: 0x100000,
572 uwbuf: 0x100000,
573 }
574 }
575}
576
577struct WriteBuffer {
579 ix: usize,
581 pos: u64,
583 pub stg: Box<dyn BasicStorage>,
585 buf: Vec<u8>,
587}
588
589impl WriteBuffer {
590 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
592 Self {
593 ix: 0,
594 pos: u64::MAX,
595 stg,
596 buf: vec![0; buf_size],
597 }
598 }
599
600 pub fn write(&mut self, off: u64, data: &[u8]) {
602 if self.pos + self.ix as u64 != off {
603 self.flush(off);
604 }
605 let mut done: usize = 0;
606 let mut todo: usize = data.len();
607 while todo > 0 {
608 let mut n: usize = self.buf.len() - self.ix;
609 if n == 0 {
610 self.flush(off + done as u64);
611 n = self.buf.len();
612 }
613 if n > todo {
614 n = todo;
615 }
616 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
617 todo -= n;
618 done += n;
619 self.ix += n;
620 }
621 }
622
623 fn flush(&mut self, new_pos: u64) {
624 if self.ix > 0 {
625 self.stg.write(self.pos, &self.buf[0..self.ix]);
626 }
627 self.ix = 0;
628 self.pos = new_pos;
629 }
630
631 pub fn commit(&mut self, size: u64) {
633 self.flush(u64::MAX);
634 self.stg.commit(size);
635 }
636
637 pub fn write_u64(&mut self, start: u64, value: u64) {
639 self.write(start, &value.to_le_bytes());
640 }
641}
642
643struct ReadBufStg<const N: usize> {
649 stg: Box<dyn Storage>,
651 buf: Mutex<ReadBuffer<N>>,
653 limit: usize,
655}
656
657impl<const N: usize> Drop for ReadBufStg<N> {
658 fn drop(&mut self) {
659 self.reset();
660 }
661}
662
663impl<const N: usize> ReadBufStg<N> {
664 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
666 Self {
667 stg,
668 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
669 limit,
670 }
671 }
672
673 fn reset(&mut self) {
675 self.buf.lock().unwrap().reset();
676 }
677}
678
679impl<const N: usize> BasicStorage for ReadBufStg<N> {
680 fn read(&self, start: u64, data: &mut [u8]) {
682 if data.len() <= self.limit {
683 self.buf.lock().unwrap().read(&*self.stg, start, data);
684 } else {
685 self.stg.read(start, data);
686 }
687 }
688
689 fn size(&self) -> u64 {
691 panic!()
692 }
693
694 fn write(&mut self, _start: u64, _data: &[u8]) {
696 panic!();
697 }
698
699 fn commit(&mut self, _size: u64) {
701 panic!();
702 }
703}
704
705struct ReadBuffer<const N: usize> {
706 map: HashMap<u64, Box<[u8; N]>>,
708 max_buf: usize,
710}
711
712impl<const N: usize> ReadBuffer<N> {
713 fn new(max_buf: usize) -> Self {
714 Self {
715 map: HashMap::default(),
716 max_buf,
717 }
718 }
719
720 fn reset(&mut self) {
721 self.map.clear();
722 }
723
724 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
725 let mut done = 0;
726 while done < data.len() {
727 let off = off + done as u64;
728 let sector = off / N as u64;
729 let disp = (off % N as u64) as usize;
730 let amount = min(data.len() - done, N - disp);
731
732 let p = self.map.entry(sector).or_insert_with(|| {
733 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
734 stg.read(sector * N as u64, &mut *p);
735 p
736 });
737 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
738 done += amount;
739 }
740 if self.map.len() >= self.max_buf {
741 self.reset();
742 }
743 }
744}
745
746#[derive(Default)]
747struct DataSlice {
749 pub data: Data,
751 pub off: usize,
753 pub len: usize,
755}
756
757impl DataSlice {
758 pub fn all(&self) -> &[u8] {
760 &self.data[self.off..self.off + self.len]
761 }
762 pub fn part(&self, off: usize, len: usize) -> &[u8] {
764 &self.data[self.off + off..self.off + off + len]
765 }
766 pub fn trim(&mut self, trim: usize) {
768 self.off += trim;
769 self.len -= trim;
770 }
771 #[allow(dead_code)]
773 pub fn take(&mut self) -> Data {
774 std::mem::take(&mut self.data)
775 }
776}
777
778#[derive(Default)]
779struct WMap {
781 map: BTreeMap<u64, DataSlice>,
783}
784
785impl WMap {
786 pub fn is_empty(&self) -> bool {
788 self.map.is_empty()
789 }
790
791 pub fn len(&self) -> usize {
793 self.map.len()
794 }
795
796 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
798 let map = std::mem::take(&mut self.map);
799 let mut result = GVec::with_capacity(map.len());
800 for (end, v) in map {
801 let start = end - v.len as u64;
802 result.push((start, v));
803 }
804 result
805 }
806
807 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
809 for (end, v) in self.map.iter() {
810 let start = end - v.len as u64;
811 stg.write_data(start, v.data.clone(), v.off, v.len);
812 }
813 }
814
815 #[cfg(not(feature = "pstd"))]
816 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
818 if len != 0 {
819 let (mut insert, mut remove) = (Vec::new(), Vec::new());
820 let end = start + len as u64;
821 for (ee, v) in self.map.range_mut(start + 1..) {
822 let ee = *ee;
823 let es = ee - v.len as u64; if es >= end {
825 break;
827 } else if start <= es {
828 if end < ee {
829 v.trim((end - es) as usize);
831 break;
832 }
833 remove.push(ee);
835 } else if end < ee {
836 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
839 v.trim((end - es) as usize);
840 break;
841 } else {
842 insert.push((es, v.take(), v.off, (start - es) as usize));
845 remove.push(ee);
846 }
847 }
848 for end in remove {
849 self.map.remove(&end);
850 }
851 for (start, data, off, len) in insert {
852 self.map
853 .insert(start + len as u64, DataSlice { data, off, len });
854 }
855 self.map
856 .insert(start + len as u64, DataSlice { data, off, len });
857 }
858 }
859
860 #[cfg(feature = "pstd")]
861 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
863 if len != 0 {
864 let end = start + len as u64;
865 let mut c = self
866 .map
867 .lower_bound_mut(std::ops::Bound::Excluded(&start))
868 .with_mutable_key();
869 while let Some((eend, v)) = c.next() {
870 let ee = *eend;
871 let es = ee - v.len as u64; if es >= end {
873 c.prev();
875 break;
876 } else if start <= es {
877 if end < ee {
878 v.trim((end - es) as usize);
880 c.prev();
881 break;
882 }
883 c.remove_prev();
885 } else if end < ee {
886 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
889 v.trim((end - es) as usize);
890 c.prev();
891 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
892 break;
893 } else {
894 v.len = (start - es) as usize;
897 *eend = es + v.len as u64;
898 }
899 }
900 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
902 }
903 }
904
905 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
907 let len = data.len();
908 if len != 0 {
909 let mut done = 0;
910 for (&end, v) in self.map.range(start + 1..) {
911 let es = end - v.len as u64; let doff = start + done as u64;
913 if es > doff {
914 let a = min(len - done, (es - doff) as usize);
916 u.read(doff, &mut data[done..done + a]);
917 done += a;
918 if done == len {
919 return;
920 }
921 }
922 let skip = (start + done as u64 - es) as usize;
924 let a = min(len - done, v.len - skip);
925 data[done..done + a].copy_from_slice(v.part(skip, a));
926 done += a;
927 if done == len {
928 return;
929 }
930 }
931 u.read(start + done as u64, &mut data[done..]);
932 }
933 }
934}
935
936pub struct BasicAtomicFile {
938 stg: WriteBuffer,
940 upd: WriteBuffer,
942 map: WMap,
944 list: GVec<(u64, DataSlice)>,
946 size: u64,
947 stop: bool,
948}
949
950impl BasicAtomicFile {
951 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
953 let size = stg.size();
954 let mut result = Box::new(Self {
955 stg: WriteBuffer::new(stg, lim.swbuf),
956 upd: WriteBuffer::new(upd, lim.uwbuf),
957 map: WMap::default(),
958 list: GVec::new(),
959 size,
960 stop: false,
961 });
962 result.init();
963 result
964 }
965
966 fn init(&mut self) {
968 let end = self.upd.stg.read_u64(0);
969 let size = self.upd.stg.read_u64(8);
970 if end == 0 {
971 return;
972 }
973 assert!(end == self.upd.stg.size());
974 let mut pos = 16;
975 while pos < end {
976 let start = self.upd.stg.read_u64(pos);
977 pos += 8;
978 let len = self.upd.stg.read_u64(pos);
979 pos += 8;
980 let mut buf: GVec<u8> = gvec![0; len as usize];
981 self.upd.stg.read(pos, &mut buf);
982 pos += len;
983 self.stg.write(start, &buf);
984 }
985 self.stg.commit(size);
986 self.upd.commit(0);
987 }
988
989 pub fn commit_phase(&mut self, size: u64, phase: u8) {
991 if self.map.is_empty() && self.list.is_empty() {
992 return;
993 }
994 if phase == 1 {
995 self.list = self.map.convert_to_vec();
996
997 self.upd.write_u64(0, 0);
1000 self.upd.write_u64(8, size);
1001 self.upd.commit(16); let mut stg_written = false;
1005 let mut pos: u64 = 16;
1006 for (start, v) in self.list.iter() {
1007 let (start, len, data) = (*start, v.len as u64, v.all());
1008 if start >= self.size {
1009 stg_written = true;
1011 self.stg.write(start, data);
1012 } else {
1013 self.upd.write_u64(pos, start);
1014 pos += 8;
1015 self.upd.write_u64(pos, len);
1016 pos += 8;
1017 self.upd.write(pos, data);
1018 pos += len;
1019 }
1020 }
1021 if stg_written {
1022 self.stg.commit(size);
1023 }
1024 self.upd.commit(pos); self.upd.write_u64(0, pos);
1028 self.upd.write_u64(8, size);
1029 self.upd.commit(pos);
1030 } else {
1031 for (start, v) in self.list.iter() {
1032 if *start < self.size {
1033 self.stg.write(*start, v.all());
1035 }
1036 }
1037 self.list = GVec::new();
1038 self.stg.commit(size);
1039 self.upd.commit(0);
1040 }
1041 }
1042}
1043
1044impl BasicStorage for BasicAtomicFile {
1045 fn commit(&mut self, size: u64) {
1046 if self.stop {
1047 return;
1048 }
1049 self.commit_phase(size, 1);
1050 self.commit_phase(size, 2);
1051 self.size = size;
1052 }
1053
1054 fn size(&self) -> u64 {
1055 self.size
1056 }
1057
1058 fn read(&self, start: u64, data: &mut [u8]) {
1059 self.map.read(start, data, &*self.stg.stg);
1060 }
1061
1062 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1063 self.map.write(start, data, off, len);
1064 }
1065
1066 fn write(&mut self, start: u64, data: &[u8]) {
1067 let len = data.len();
1068 let d = Arc::new(data.to_vec());
1069 self.write_data(start, d, 0, len);
1070 }
1071
1072 fn shutdown(&mut self) {
1073 self.stop = true;
1074 }
1075}
1076
1077#[cfg(target_family = "unix")]
1079pub struct UnixFileStorage {
1080 size: Arc<Mutex<u64>>,
1081 f: fs::File,
1082}
1083#[cfg(target_family = "unix")]
1084impl UnixFileStorage {
1085 pub fn new(filename: &str) -> Box<Self> {
1087 let mut f = OpenOptions::new()
1088 .read(true)
1089 .write(true)
1090 .create(true)
1091 .truncate(false)
1092 .open(filename)
1093 .unwrap();
1094 let size = f.seek(SeekFrom::End(0)).unwrap();
1095 let size = Arc::new(Mutex::new(size));
1096 Box::new(Self { size, f })
1097 }
1098}
1099
1100#[cfg(target_family = "unix")]
1101impl Storage for UnixFileStorage {
1102 fn clone(&self) -> Box<dyn Storage> {
1103 Box::new(Self {
1104 size: self.size.clone(),
1105 f: self.f.try_clone().unwrap(),
1106 })
1107 }
1108}
1109
1110#[cfg(target_family = "unix")]
1111use std::os::unix::fs::FileExt;
1112
1113#[cfg(target_family = "unix")]
1114impl BasicStorage for UnixFileStorage {
1115 fn read(&self, start: u64, data: &mut [u8]) {
1116 let _ = self.f.read_at(data, start);
1117 }
1118
1119 fn write(&mut self, start: u64, data: &[u8]) {
1120 let _ = self.f.write_at(data, start);
1121 }
1122
1123 fn size(&self) -> u64 {
1124 *self.size.lock().unwrap()
1125 }
1126
1127 fn commit(&mut self, size: u64) {
1128 *self.size.lock().unwrap() = size;
1129 self.f.set_len(size).unwrap();
1130 self.f.sync_all().unwrap();
1131 }
1132}
1133
1134#[cfg(target_family = "windows")]
1136pub struct WindowsFileStorage {
1137 size: Arc<Mutex<u64>>,
1138 f: fs::File,
1139}
1140#[cfg(target_family = "windows")]
1141impl WindowsFileStorage {
1142 pub fn new(filename: &str) -> Box<Self> {
1144 let mut f = OpenOptions::new()
1145 .read(true)
1146 .write(true)
1147 .create(true)
1148 .truncate(false)
1149 .open(filename)
1150 .unwrap();
1151 let size = f.seek(SeekFrom::End(0)).unwrap();
1152 let size = Arc::new(Mutex::new(size));
1153 Box::new(Self { size, f })
1154 }
1155}
1156
1157#[cfg(target_family = "windows")]
1158impl Storage for WindowsFileStorage {
1159 fn clone(&self) -> Box<dyn Storage> {
1160 Box::new(Self {
1161 size: self.size.clone(),
1162 f: self.f.try_clone().unwrap(),
1163 })
1164 }
1165}
1166
1167#[cfg(target_family = "windows")]
1168use std::os::windows::fs::FileExt;
1169
1170#[cfg(target_family = "windows")]
1171impl BasicStorage for WindowsFileStorage {
1172 fn read(&self, start: u64, data: &mut [u8]) {
1173 let _ = self.f.seek_read(data, start);
1174 }
1175
1176 fn write(&mut self, start: u64, data: &[u8]) {
1177 let _ = self.f.seek_write(data, start);
1178 }
1179
1180 fn size(&self) -> u64 {
1181 *self.size.lock().unwrap()
1182 }
1183
1184 fn commit(&mut self, size: u64) {
1185 *self.size.lock().unwrap() = size;
1186 self.f.set_len(size).unwrap();
1187 self.f.sync_all().unwrap();
1188 }
1189}
1190
1191#[cfg(target_family = "windows")]
1193pub type MultiFileStorage = WindowsFileStorage;
1194
1195#[cfg(target_family = "unix")]
1197pub type MultiFileStorage = UnixFileStorage;
1198
1199#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1201pub type MultiFileStorage = AnyFileStorage;
1202
1203#[cfg(any(target_family = "windows", target_family = "unix"))]
1205pub type FastFileStorage = MultiFileStorage;
1206
1207#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1209pub type FastFileStorage = UpdFileStorage;
1210
1211#[cfg(test)]
1212fn test_amount() -> usize {
1214 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1215}
1216
1217#[test]
1218fn test_atomic_file() {
1219 use rand::Rng;
1220 let ta = test_amount();
1223 println!(" Test amount={}", ta);
1224
1225 let mut rng = rand::thread_rng();
1226
1227 for _ in 0..100 {
1228 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1229 let mut s2 = MemFile::new();
1231
1232 for _ in 0..1000 * ta {
1233 let off: usize = rng.r#gen::<usize>() % 100;
1234 let mut len = 1 + rng.r#gen::<usize>() % 20;
1235 let w: bool = rng.r#gen();
1236 if w {
1237 let mut bytes = Vec::new();
1238 while len > 0 {
1239 len -= 1;
1240 let b: u8 = rng.r#gen::<u8>();
1241 bytes.push(b);
1242 }
1243 s1.write(off as u64, &bytes);
1244 s2.write(off as u64, &bytes);
1245 } else {
1246 let mut b2 = vec![0; len];
1247 let mut b3 = vec![0; len];
1248 s1.read(off as u64, &mut b2);
1249 s2.read(off as u64, &mut b3);
1250 assert!(b2 == b3);
1251 }
1252 if rng.r#gen::<usize>() % 50 == 0 {
1253 s1.commit(200);
1254 s2.commit(200);
1255 }
1256 }
1257 }
1258}