1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::{GTemp, Temp},
27 veca as tvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::vec as tvec;
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(not(feature = "pstd"))]
37use std::collections::BTreeMap;
38
39#[cfg(feature = "pstd")]
40type GVec<T> = VecA<T, GTemp>;
41
42#[cfg(not(feature = "pstd"))]
43type GVec<T> = Vec<T>;
44
45#[cfg(feature = "pstd")]
46type TVec<T> = VecA<T, Temp>;
47
48#[cfg(not(feature = "pstd"))]
49type TVec<T> = Vec<T>;
50
51pub type Data = Arc<Vec<u8>>;
53
54pub struct AtomicFile {
71 map: WMap,
73 cf: Arc<RwLock<CommitFile>>,
75 size: u64,
77 tx: std::sync::mpsc::Sender<(u64, WMap)>,
79 busy: Arc<Mutex<()>>,
81 map_lim: usize,
83}
84
85impl AtomicFile {
86 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
88 Self::new_with_limits(stg, upd, &Limits::default())
89 }
90
91 pub fn new_with_limits(
93 stg: Box<dyn Storage>,
94 upd: Box<dyn BasicStorage>,
95 lim: &Limits,
96 ) -> Box<Self> {
97 let size = stg.size();
98 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
99
100 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
101 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
102 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
106
107 std::thread::spawn(move || {
108 while let Ok((size, map)) = rx.recv() {
110 let _lock = busy1.lock();
111 baf.map = map;
112 baf.commit(size);
113 cf1.write().unwrap().done_one();
114 }
115 });
116 Box::new(Self {
117 map: WMap::default(),
118 cf,
119 size,
120 tx,
121 busy,
122 map_lim: lim.map_lim,
123 })
124 }
125}
126
127impl Storage for AtomicFile {
128 fn clone(&self) -> Box<dyn Storage> {
129 panic!()
130 }
131}
132
133impl BasicStorage for AtomicFile {
134 fn commit(&mut self, size: u64) {
135 self.size = size;
136 if self.map.is_empty() {
137 return;
138 }
139 if self.cf.read().unwrap().map.len() > self.map_lim {
140 self.wait_complete();
141 }
142 let map = std::mem::take(&mut self.map);
143 let cf = &mut *self.cf.write().unwrap();
144 cf.todo += 1;
145 map.to_storage(cf);
147 self.tx.send((size, map)).unwrap();
149 }
150
151 fn size(&self) -> u64 {
152 self.size
153 }
154
155 fn read(&self, start: u64, data: &mut [u8]) {
156 self.map.read(start, data, &*self.cf.read().unwrap());
157 }
158
159 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160 self.map.write(start, data, off, len);
161 }
162
163 fn write(&mut self, start: u64, data: &[u8]) {
164 let len = data.len();
165 let d = Arc::new(data.to_vec());
166 self.write_data(start, d, 0, len);
167 }
168
169 fn wait_complete(&self) {
170 while self.cf.read().unwrap().todo != 0 {
171 let _x = self.busy.lock();
172 }
173 }
174}
175
176struct CommitFile {
177 stg: ReadBufStg<256>,
179 map: WMap,
181 todo: usize,
183}
184
185impl CommitFile {
186 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
187 Self {
188 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
189 map: WMap::default(),
190 todo: 0,
191 }
192 }
193
194 fn done_one(&mut self) {
195 self.todo -= 1;
196 if self.todo == 0 {
197 self.map = WMap::default();
198 self.stg.reset();
199 }
200 }
201}
202
203impl BasicStorage for CommitFile {
204 fn commit(&mut self, _size: u64) {
205 panic!()
206 }
207
208 fn size(&self) -> u64 {
209 panic!()
210 }
211
212 fn read(&self, start: u64, data: &mut [u8]) {
213 self.map.read(start, data, &self.stg);
214 }
215
216 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
217 self.map.write(start, data, off, len);
218 }
219
220 fn write(&mut self, _start: u64, _data: &[u8]) {
221 panic!()
222 }
223}
224
225pub trait BasicStorage: Send {
229 fn size(&self) -> u64;
232
233 fn read(&self, start: u64, data: &mut [u8]);
235
236 fn write(&mut self, start: u64, data: &[u8]);
238
239 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
241 let len = data.len();
242 let d = Arc::new(data);
243 self.write_data(start, d, 0, len);
244 }
245
246 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
248 self.write(start, &data[off..off + len]);
249 }
250
251 fn commit(&mut self, size: u64);
253
254 fn write_u64(&mut self, start: u64, value: u64) {
256 self.write(start, &value.to_le_bytes());
257 }
258
259 fn read_u64(&self, start: u64) -> u64 {
261 let mut bytes = [0; 8];
262 self.read(start, &mut bytes);
263 u64::from_le_bytes(bytes)
264 }
265
266 fn wait_complete(&self) {}
268}
269
270pub trait Storage: BasicStorage + Sync {
272 fn clone(&self) -> Box<dyn Storage>;
274}
275
276#[derive(Default)]
278pub struct MemFile {
279 v: Arc<Mutex<Vec<u8>>>,
280}
281
282impl MemFile {
283 pub fn new() -> Box<Self> {
285 Box::default()
286 }
287}
288
289impl Storage for MemFile {
290 fn clone(&self) -> Box<dyn Storage> {
291 Box::new(Self { v: self.v.clone() })
292 }
293}
294
295impl BasicStorage for MemFile {
296 fn size(&self) -> u64 {
297 let v = self.v.lock().unwrap();
298 v.len() as u64
299 }
300
301 fn read(&self, off: u64, bytes: &mut [u8]) {
302 let off = off as usize;
303 let len = bytes.len();
304 let mut v = self.v.lock().unwrap();
305 if off + len > v.len() {
306 v.resize(off + len, 0);
307 }
308 bytes.copy_from_slice(&v[off..off + len]);
309 }
310
311 fn write(&mut self, off: u64, bytes: &[u8]) {
312 let off = off as usize;
313 let len = bytes.len();
314 let mut v = self.v.lock().unwrap();
315 if off + len > v.len() {
316 v.resize(off + len, 0);
317 }
318 v[off..off + len].copy_from_slice(bytes);
319 }
320
321 fn commit(&mut self, size: u64) {
322 let mut v = self.v.lock().unwrap();
323 v.resize(size as usize, 0);
324 }
325}
326
327use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
328
329struct FileInner {
330 f: fs::File,
331}
332
333impl FileInner {
334 pub fn new(filename: &str) -> Self {
336 Self {
337 f: OpenOptions::new()
338 .read(true)
339 .write(true)
340 .create(true)
341 .truncate(false)
342 .open(filename)
343 .unwrap(),
344 }
345 }
346
347 fn size(&mut self) -> u64 {
348 self.f.seek(SeekFrom::End(0)).unwrap()
349 }
350
351 fn read(&mut self, off: u64, bytes: &mut [u8]) {
352 self.f.seek(SeekFrom::Start(off)).unwrap();
353 let _ = self.f.read(bytes).unwrap();
354 }
355
356 fn write(&mut self, off: u64, bytes: &[u8]) {
357 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
359 {
360 let size = self.f.seek(SeekFrom::End(0)).unwrap();
361 if off > size {
362 self.f.set_len(off).unwrap();
363 }
364 }
365 self.f.seek(SeekFrom::Start(off)).unwrap();
366 let _ = self.f.write(bytes).unwrap();
367 }
368
369 fn commit(&mut self, size: u64) {
370 self.f.set_len(size).unwrap();
371 self.f.sync_all().unwrap();
372 }
373}
374
375pub struct UpdFileStorage {
377 file: Cell<Option<FileInner>>,
378}
379
380impl UpdFileStorage {
381 pub fn new(filename: &str) -> Box<Self> {
383 Box::new(Self {
384 file: Cell::new(Some(FileInner::new(filename))),
385 })
386 }
387}
388
389impl BasicStorage for UpdFileStorage {
390 fn size(&self) -> u64 {
391 let mut f = self.file.take().unwrap();
392 let result = f.size();
393 self.file.set(Some(f));
394 result
395 }
396 fn read(&self, off: u64, bytes: &mut [u8]) {
397 let mut f = self.file.take().unwrap();
398 f.read(off, bytes);
399 self.file.set(Some(f));
400 }
401
402 fn write(&mut self, off: u64, bytes: &[u8]) {
403 let mut f = self.file.take().unwrap();
404 f.write(off, bytes);
405 self.file.set(Some(f));
406 }
407
408 fn commit(&mut self, size: u64) {
409 let mut f = self.file.take().unwrap();
410 f.commit(size);
411 self.file.set(Some(f));
412 }
413}
414
415pub struct SimpleFileStorage {
417 file: Arc<Mutex<FileInner>>,
418}
419
420impl SimpleFileStorage {
421 pub fn new(filename: &str) -> Box<Self> {
423 Box::new(Self {
424 file: Arc::new(Mutex::new(FileInner::new(filename))),
425 })
426 }
427}
428
429impl Storage for SimpleFileStorage {
430 fn clone(&self) -> Box<dyn Storage> {
431 Box::new(Self {
432 file: self.file.clone(),
433 })
434 }
435}
436
437impl BasicStorage for SimpleFileStorage {
438 fn size(&self) -> u64 {
439 self.file.lock().unwrap().size()
440 }
441
442 fn read(&self, off: u64, bytes: &mut [u8]) {
443 self.file.lock().unwrap().read(off, bytes);
444 }
445
446 fn write(&mut self, off: u64, bytes: &[u8]) {
447 self.file.lock().unwrap().write(off, bytes);
448 }
449
450 fn commit(&mut self, size: u64) {
451 self.file.lock().unwrap().commit(size);
452 }
453}
454
455pub struct AnyFileStorage {
457 filename: String,
458 files: Arc<Mutex<Vec<FileInner>>>,
459}
460
461impl AnyFileStorage {
462 pub fn new(filename: &str) -> Box<Self> {
464 Box::new(Self {
465 filename: filename.to_owned(),
466 files: Arc::new(Mutex::new(Vec::new())),
467 })
468 }
469
470 fn get_file(&self) -> FileInner {
471 match self.files.lock().unwrap().pop() {
472 Some(f) => f,
473 _ => FileInner::new(&self.filename),
474 }
475 }
476
477 fn put_file(&self, f: FileInner) {
478 self.files.lock().unwrap().push(f);
479 }
480}
481
482impl Storage for AnyFileStorage {
483 fn clone(&self) -> Box<dyn Storage> {
484 Box::new(Self {
485 filename: self.filename.clone(),
486 files: self.files.clone(),
487 })
488 }
489}
490
491impl BasicStorage for AnyFileStorage {
492 fn size(&self) -> u64 {
493 let mut f = self.get_file();
494 let result = f.size();
495 self.put_file(f);
496 result
497 }
498
499 fn read(&self, off: u64, bytes: &mut [u8]) {
500 let mut f = self.get_file();
501 f.read(off, bytes);
502 self.put_file(f);
503 }
504
505 fn write(&mut self, off: u64, bytes: &[u8]) {
506 let mut f = self.get_file();
507 f.write(off, bytes);
508 self.put_file(f);
509 }
510
511 fn commit(&mut self, size: u64) {
512 let mut f = self.get_file();
513 f.commit(size);
514 self.put_file(f);
515 }
516}
517
518pub struct DummyFile {}
520impl DummyFile {
521 pub fn new() -> Box<Self> {
523 Box::new(Self {})
524 }
525}
526
527impl Storage for DummyFile {
528 fn clone(&self) -> Box<dyn Storage> {
529 Self::new()
530 }
531}
532
533impl BasicStorage for DummyFile {
534 fn size(&self) -> u64 {
535 0
536 }
537
538 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
539
540 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
541
542 fn commit(&mut self, _size: u64) {}
543}
544
545#[non_exhaustive]
547pub struct Limits {
548 pub map_lim: usize,
550 pub rbuf_mem: usize,
552 pub swbuf: usize,
554 pub uwbuf: usize,
556}
557
558impl Default for Limits {
559 fn default() -> Self {
560 Self {
561 map_lim: 5000,
562 rbuf_mem: 0x200000,
563 swbuf: 0x100000,
564 uwbuf: 0x100000,
565 }
566 }
567}
568
569struct WriteBuffer {
571 ix: usize,
573 pos: u64,
575 pub stg: Box<dyn BasicStorage>,
577 buf: Vec<u8>,
579}
580
581impl WriteBuffer {
582 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
584 Self {
585 ix: 0,
586 pos: u64::MAX,
587 stg,
588 buf: vec![0; buf_size],
589 }
590 }
591
592 pub fn write(&mut self, off: u64, data: &[u8]) {
594 if self.pos + self.ix as u64 != off {
595 self.flush(off);
596 }
597 let mut done: usize = 0;
598 let mut todo: usize = data.len();
599 while todo > 0 {
600 let mut n: usize = self.buf.len() - self.ix;
601 if n == 0 {
602 self.flush(off + done as u64);
603 n = self.buf.len();
604 }
605 if n > todo {
606 n = todo;
607 }
608 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
609 todo -= n;
610 done += n;
611 self.ix += n;
612 }
613 }
614
615 fn flush(&mut self, new_pos: u64) {
616 if self.ix > 0 {
617 self.stg.write(self.pos, &self.buf[0..self.ix]);
618 }
619 self.ix = 0;
620 self.pos = new_pos;
621 }
622
623 pub fn commit(&mut self, size: u64) {
625 self.flush(u64::MAX);
626 self.stg.commit(size);
627 }
628
629 pub fn write_u64(&mut self, start: u64, value: u64) {
631 self.write(start, &value.to_le_bytes());
632 }
633}
634
635struct ReadBufStg<const N: usize> {
641 stg: Box<dyn Storage>,
643 buf: Mutex<ReadBuffer<N>>,
645 limit: usize,
647}
648
649impl<const N: usize> Drop for ReadBufStg<N> {
650 fn drop(&mut self) {
651 self.reset();
652 }
653}
654
655impl<const N: usize> ReadBufStg<N> {
656 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
658 Self {
659 stg,
660 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
661 limit,
662 }
663 }
664
665 fn reset(&mut self) {
667 self.buf.lock().unwrap().reset();
668 }
669}
670
671impl<const N: usize> BasicStorage for ReadBufStg<N> {
672 fn read(&self, start: u64, data: &mut [u8]) {
674 if data.len() <= self.limit {
675 self.buf.lock().unwrap().read(&*self.stg, start, data);
676 } else {
677 self.stg.read(start, data);
678 }
679 }
680
681 fn size(&self) -> u64 {
683 panic!()
684 }
685
686 fn write(&mut self, _start: u64, _data: &[u8]) {
688 panic!();
689 }
690
691 fn commit(&mut self, _size: u64) {
693 panic!();
694 }
695}
696
697struct ReadBuffer<const N: usize> {
698 map: HashMap<u64, Box<[u8; N]>>,
700 max_buf: usize,
702}
703
704impl<const N: usize> ReadBuffer<N> {
705 fn new(max_buf: usize) -> Self {
706 Self {
707 map: HashMap::default(),
708 max_buf,
709 }
710 }
711
712 fn reset(&mut self) {
713 self.map.clear();
714 }
715
716 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
717 let mut done = 0;
718 while done < data.len() {
719 let off = off + done as u64;
720 let sector = off / N as u64;
721 let disp = (off % N as u64) as usize;
722 let amount = min(data.len() - done, N - disp);
723
724 let p = self.map.entry(sector).or_insert_with(|| {
725 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
726 stg.read(sector * N as u64, &mut *p);
727 p
728 });
729 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
730 done += amount;
731 }
732 if self.map.len() >= self.max_buf {
733 self.reset();
734 }
735 }
736}
737
738#[derive(Default)]
739struct DataSlice {
741 pub data: Data,
743 pub off: usize,
745 pub len: usize,
747}
748
749impl DataSlice {
750 pub fn all(&self) -> &[u8] {
752 &self.data[self.off..self.off + self.len]
753 }
754 pub fn part(&self, off: usize, len: usize) -> &[u8] {
756 &self.data[self.off + off..self.off + off + len]
757 }
758 pub fn trim(&mut self, trim: usize) {
760 self.off += trim;
761 self.len -= trim;
762 }
763 #[allow(dead_code)]
765 pub fn take(&mut self) -> Data {
766 std::mem::take(&mut self.data)
767 }
768}
769
770#[derive(Default)]
771struct WMap {
773 map: BTreeMap<u64, DataSlice>,
775}
776
777impl WMap {
778 pub fn is_empty(&self) -> bool {
780 self.map.is_empty()
781 }
782
783 pub fn len(&self) -> usize {
785 self.map.len()
786 }
787
788 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
790 let map = std::mem::take(&mut self.map);
791 let mut result = GVec::with_capacity(map.len());
792 for (end, v) in map {
793 let start = end - v.len as u64;
794 result.push((start, v));
795 }
796 result
797 }
798
799 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
801 for (end, v) in self.map.iter() {
802 let start = end - v.len as u64;
803 stg.write_data(start, v.data.clone(), v.off, v.len);
804 }
805 }
806
807 #[cfg(not(feature = "pstd"))]
808 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
810 if len != 0 {
811 let (mut insert, mut remove) = (Vec::new(), Vec::new());
812 let end = start + len as u64;
813 for (ee, v) in self.map.range_mut(start + 1..) {
814 let ee = *ee;
815 let es = ee - v.len as u64; if es >= end {
817 break;
819 } else if start <= es {
820 if end < ee {
821 v.trim((end - es) as usize);
823 break;
824 }
825 remove.push(ee);
827 } else if end < ee {
828 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
831 v.trim((end - es) as usize);
832 break;
833 } else {
834 insert.push((es, v.take(), v.off, (start - es) as usize));
837 remove.push(ee);
838 }
839 }
840 for end in remove {
841 self.map.remove(&end);
842 }
843 for (start, data, off, len) in insert {
844 self.map
845 .insert(start + len as u64, DataSlice { data, off, len });
846 }
847 self.map
848 .insert(start + len as u64, DataSlice { data, off, len });
849 }
850 }
851
852 #[cfg(feature = "pstd")]
853 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
855 if len != 0 {
856 let end = start + len as u64;
857 let mut c = self
858 .map
859 .lower_bound_mut(std::ops::Bound::Excluded(&start))
860 .with_mutable_key();
861 while let Some((eend, v)) = c.next() {
862 let ee = *eend;
863 let es = ee - v.len as u64; if es >= end {
865 c.prev();
867 break;
868 } else if start <= es {
869 if end < ee {
870 v.trim((end - es) as usize);
872 c.prev();
873 break;
874 }
875 c.remove_prev();
877 } else if end < ee {
878 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
881 v.trim((end - es) as usize);
882 c.prev();
883 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
884 break;
885 } else {
886 v.len = (start - es) as usize;
889 *eend = es + v.len as u64;
890 }
891 }
892 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
894 }
895 }
896
897 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
899 let len = data.len();
900 if len != 0 {
901 let mut done = 0;
902 for (&end, v) in self.map.range(start + 1..) {
903 let es = end - v.len as u64; let doff = start + done as u64;
905 if es > doff {
906 let a = min(len - done, (es - doff) as usize);
908 u.read(doff, &mut data[done..done + a]);
909 done += a;
910 if done == len {
911 return;
912 }
913 }
914 let skip = (start + done as u64 - es) as usize;
916 let a = min(len - done, v.len - skip);
917 data[done..done + a].copy_from_slice(v.part(skip, a));
918 done += a;
919 if done == len {
920 return;
921 }
922 }
923 u.read(start + done as u64, &mut data[done..]);
924 }
925 }
926}
927
928pub struct BasicAtomicFile {
930 stg: WriteBuffer,
932 upd: WriteBuffer,
934 map: WMap,
936 list: GVec<(u64, DataSlice)>,
938 size: u64,
939}
940
941impl BasicAtomicFile {
942 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
944 let size = stg.size();
945 let mut result = Box::new(Self {
946 stg: WriteBuffer::new(stg, lim.swbuf),
947 upd: WriteBuffer::new(upd, lim.uwbuf),
948 map: WMap::default(),
949 list: GVec::new(),
950 size,
951 });
952 result.init();
953 result
954 }
955
956 fn init(&mut self) {
958 let end = self.upd.stg.read_u64(0);
959 let size = self.upd.stg.read_u64(8);
960 if end == 0 {
961 return;
962 }
963 assert!(end == self.upd.stg.size());
964 let mut pos = 16;
965 while pos < end {
966 let start = self.upd.stg.read_u64(pos);
967 pos += 8;
968 let len = self.upd.stg.read_u64(pos);
969 pos += 8;
970 let mut buf: TVec<u8> = tvec![0; len as usize];
971 self.upd.stg.read(pos, &mut buf);
972 pos += len;
973 self.stg.write(start, &buf);
974 }
975 self.stg.commit(size);
976 self.upd.commit(0);
977 }
978
979 pub fn commit_phase(&mut self, size: u64, phase: u8) {
981 if self.map.is_empty() && self.list.is_empty() {
982 return;
983 }
984 if phase == 1 {
985 self.list = self.map.convert_to_vec();
986
987 self.upd.write_u64(0, 0);
990 self.upd.write_u64(8, size);
991 self.upd.commit(16); let mut stg_written = false;
995 let mut pos: u64 = 16;
996 for (start, v) in self.list.iter() {
997 let (start, len, data) = (*start, v.len as u64, v.all());
998 if start >= self.size {
999 stg_written = true;
1001 self.stg.write(start, data);
1002 } else {
1003 self.upd.write_u64(pos, start);
1004 pos += 8;
1005 self.upd.write_u64(pos, len);
1006 pos += 8;
1007 self.upd.write(pos, data);
1008 pos += len;
1009 }
1010 }
1011 if stg_written {
1012 self.stg.commit(size);
1013 }
1014 self.upd.commit(pos); self.upd.write_u64(0, pos);
1018 self.upd.write_u64(8, size);
1019 self.upd.commit(pos);
1020 } else {
1021 for (start, v) in self.list.iter() {
1022 if *start < self.size {
1023 self.stg.write(*start, v.all());
1025 }
1026 }
1027 self.list = GVec::new();
1028 self.stg.commit(size);
1029 self.upd.commit(0);
1030 }
1031 }
1032}
1033
1034impl BasicStorage for BasicAtomicFile {
1035 fn commit(&mut self, size: u64) {
1036 self.commit_phase(size, 1);
1037 self.commit_phase(size, 2);
1038 self.size = size;
1039 }
1040
1041 fn size(&self) -> u64 {
1042 self.size
1043 }
1044
1045 fn read(&self, start: u64, data: &mut [u8]) {
1046 self.map.read(start, data, &*self.stg.stg);
1047 }
1048
1049 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1050 self.map.write(start, data, off, len);
1051 }
1052
1053 fn write(&mut self, start: u64, data: &[u8]) {
1054 let len = data.len();
1055 let d = Arc::new(data.to_vec());
1056 self.write_data(start, d, 0, len);
1057 }
1058}
1059
1060#[cfg(target_family = "unix")]
1062pub struct UnixFileStorage {
1063 size: Arc<Mutex<u64>>,
1064 f: fs::File,
1065}
1066#[cfg(target_family = "unix")]
1067impl UnixFileStorage {
1068 pub fn new(filename: &str) -> Box<Self> {
1070 let mut f = OpenOptions::new()
1071 .read(true)
1072 .write(true)
1073 .create(true)
1074 .truncate(false)
1075 .open(filename)
1076 .unwrap();
1077 let size = f.seek(SeekFrom::End(0)).unwrap();
1078 let size = Arc::new(Mutex::new(size));
1079 Box::new(Self { size, f })
1080 }
1081}
1082
1083#[cfg(target_family = "unix")]
1084impl Storage for UnixFileStorage {
1085 fn clone(&self) -> Box<dyn Storage> {
1086 Box::new(Self {
1087 size: self.size.clone(),
1088 f: self.f.try_clone().unwrap(),
1089 })
1090 }
1091}
1092
1093#[cfg(target_family = "unix")]
1094use std::os::unix::fs::FileExt;
1095
1096#[cfg(target_family = "unix")]
1097impl BasicStorage for UnixFileStorage {
1098 fn read(&self, start: u64, data: &mut [u8]) {
1099 let _ = self.f.read_at(data, start);
1100 }
1101
1102 fn write(&mut self, start: u64, data: &[u8]) {
1103 let _ = self.f.write_at(data, start);
1104 }
1105
1106 fn size(&self) -> u64 {
1107 *self.size.lock().unwrap()
1108 }
1109
1110 fn commit(&mut self, size: u64) {
1111 *self.size.lock().unwrap() = size;
1112 self.f.set_len(size).unwrap();
1113 self.f.sync_all().unwrap();
1114 }
1115}
1116
1117#[cfg(target_family = "windows")]
1119pub struct WindowsFileStorage {
1120 size: Arc<Mutex<u64>>,
1121 f: fs::File,
1122}
1123#[cfg(target_family = "windows")]
1124impl WindowsFileStorage {
1125 pub fn new(filename: &str) -> Box<Self> {
1127 let mut f = OpenOptions::new()
1128 .read(true)
1129 .write(true)
1130 .create(true)
1131 .truncate(false)
1132 .open(filename)
1133 .unwrap();
1134 let size = f.seek(SeekFrom::End(0)).unwrap();
1135 let size = Arc::new(Mutex::new(size));
1136 Box::new(Self { size, f })
1137 }
1138}
1139
1140#[cfg(target_family = "windows")]
1141impl Storage for WindowsFileStorage {
1142 fn clone(&self) -> Box<dyn Storage> {
1143 Box::new(Self {
1144 size: self.size.clone(),
1145 f: self.f.try_clone().unwrap(),
1146 })
1147 }
1148}
1149
1150#[cfg(target_family = "windows")]
1151use std::os::windows::fs::FileExt;
1152
1153#[cfg(target_family = "windows")]
1154impl BasicStorage for WindowsFileStorage {
1155 fn read(&self, start: u64, data: &mut [u8]) {
1156 let _ = self.f.seek_read(data, start);
1157 }
1158
1159 fn write(&mut self, start: u64, data: &[u8]) {
1160 let _ = self.f.seek_write(data, start);
1161 }
1162
1163 fn size(&self) -> u64 {
1164 *self.size.lock().unwrap()
1165 }
1166
1167 fn commit(&mut self, size: u64) {
1168 *self.size.lock().unwrap() = size;
1169 self.f.set_len(size).unwrap();
1170 self.f.sync_all().unwrap();
1171 }
1172}
1173
1174#[cfg(target_family = "windows")]
1176pub type MultiFileStorage = WindowsFileStorage;
1177
1178#[cfg(target_family = "unix")]
1180pub type MultiFileStorage = UnixFileStorage;
1181
1182#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1184pub type MultiFileStorage = AnyFileStorage;
1185
1186#[cfg(any(target_family = "windows", target_family = "unix"))]
1188pub type FastFileStorage = MultiFileStorage;
1189
1190#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1192pub type FastFileStorage = UpdFileStorage;
1193
1194#[cfg(test)]
1195fn test_amount() -> usize {
1197 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1198}
1199
1200#[test]
1201fn test_atomic_file() {
1202 use rand::Rng;
1203 let ta = test_amount();
1206 println!(" Test amount={}", ta);
1207
1208 let mut rng = rand::thread_rng();
1209
1210 for _ in 0..100 {
1211 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1212 let mut s2 = MemFile::new();
1214
1215 for _ in 0..1000 * ta {
1216 let off: usize = rng.r#gen::<usize>() % 100;
1217 let mut len = 1 + rng.r#gen::<usize>() % 20;
1218 let w: bool = rng.r#gen();
1219 if w {
1220 let mut bytes = Vec::new();
1221 while len > 0 {
1222 len -= 1;
1223 let b: u8 = rng.r#gen::<u8>();
1224 bytes.push(b);
1225 }
1226 s1.write(off as u64, &bytes);
1227 s2.write(off as u64, &bytes);
1228 } else {
1229 let mut b2 = vec![0; len];
1230 let mut b3 = vec![0; len];
1231 s1.read(off as u64, &mut b2);
1232 s2.read(off as u64, &mut b3);
1233 assert!(b2 == b3);
1234 }
1235 if rng.r#gen::<usize>() % 50 == 0 {
1236 s1.commit(200);
1237 s2.commit(200);
1238 }
1239 }
1240 }
1241}