1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::GTemp,
27 veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39pub type Data = Arc<Vec<u8>>;
41
42pub struct AtomicFile {
59 map: WMap,
61 cf: Arc<RwLock<CommitFile>>,
63 size: u64,
65 tx: std::sync::mpsc::Sender<(u64, WMap)>,
67 busy: Arc<Mutex<()>>,
69 map_lim: usize,
71}
72
73impl AtomicFile {
74 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76 Self::new_with_limits(stg, upd, &Limits::default())
77 }
78
79 pub fn new_with_limits(
81 stg: Box<dyn Storage>,
82 upd: Box<dyn BasicStorage>,
83 lim: &Limits,
84 ) -> Box<Self> {
85 let size = stg.size();
86 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
94
95 std::thread::spawn(move || {
96 while let Ok((size, map)) = rx.recv() {
98 let _lock = busy1.lock();
99 baf.map = map;
100 baf.commit(size);
101 cf1.write().unwrap().done_one();
102 }
103 });
104 Box::new(Self {
105 map: WMap::default(),
106 cf,
107 size,
108 tx,
109 busy,
110 map_lim: lim.map_lim,
111 })
112 }
113}
114
115impl Storage for AtomicFile {
116 fn clone(&self) -> Box<dyn Storage> {
117 panic!()
118 }
119}
120
121impl BasicStorage for AtomicFile {
122 fn commit(&mut self, size: u64) {
123 self.size = size;
124 if self.map.is_empty() {
125 return;
126 }
127 if self.cf.read().unwrap().map.len() > self.map_lim {
128 self.wait_complete();
129 }
130 let map = std::mem::take(&mut self.map);
131 let cf = &mut *self.cf.write().unwrap();
132 if cf.stop { return; } cf.todo += 1;
134 map.to_storage(cf);
136 self.tx.send((size, map)).unwrap();
138 }
139
140 fn size(&self) -> u64 {
141 self.size
142 }
143
144 fn read(&self, start: u64, data: &mut [u8]) {
145 self.map.read(start, data, &*self.cf.read().unwrap());
146 }
147
148 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
149 self.map.write(start, data, off, len);
150 }
151
152 fn write(&mut self, start: u64, data: &[u8]) {
153 let len = data.len();
154 let d = Arc::new(data.to_vec());
155 self.write_data(start, d, 0, len);
156 }
157
158 fn wait_complete(&self) {
159 while self.cf.read().unwrap().todo != 0 {
160 let _x = self.busy.lock();
161 }
162 }
163
164 fn shutdown(&mut self) {
165 self.cf.write().unwrap().stop = true; self.wait_complete(); }
168}
169
170struct CommitFile {
171 stg: ReadBufStg<256>,
173 map: WMap,
175 todo: usize,
177 stop: bool,
179}
180
181impl CommitFile {
182 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
183 Self {
184 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
185 map: WMap::default(),
186 todo: 0,
187 stop: false,
188 }
189 }
190
191 fn done_one(&mut self) {
192 self.todo -= 1;
193 if self.todo == 0 {
194 self.map = WMap::default();
195 self.stg.reset();
196 }
197 }
198}
199
200impl BasicStorage for CommitFile {
201 fn commit(&mut self, _size: u64) {
202 panic!()
203 }
204
205 fn size(&self) -> u64 {
206 panic!()
207 }
208
209 fn read(&self, start: u64, data: &mut [u8]) {
210 self.map.read(start, data, &self.stg);
211 }
212
213 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
214 self.map.write(start, data, off, len);
215 }
216
217 fn write(&mut self, _start: u64, _data: &[u8]) {
218 panic!()
219 }
220}
221
222pub trait BasicStorage: Send {
226 fn size(&self) -> u64;
229
230 fn read(&self, start: u64, data: &mut [u8]);
232
233 fn write(&mut self, start: u64, data: &[u8]);
235
236 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
238 let len = data.len();
239 let d = Arc::new(data);
240 self.write_data(start, d, 0, len);
241 }
242
243 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
245 self.write(start, &data[off..off + len]);
246 }
247
248 fn commit(&mut self, size: u64);
250
251 fn write_u64(&mut self, start: u64, value: u64) {
253 self.write(start, &value.to_le_bytes());
254 }
255
256 fn read_u64(&self, start: u64) -> u64 {
258 let mut bytes = [0; 8];
259 self.read(start, &mut bytes);
260 u64::from_le_bytes(bytes)
261 }
262
263 fn wait_complete(&self){}
265
266 fn shutdown(&mut self){}
268}
269
270pub trait Storage: BasicStorage + Sync {
272 fn clone(&self) -> Box<dyn Storage>;
274}
275
276#[derive(Default)]
278pub struct MemFile {
279 v: Arc<Mutex<Vec<u8>>>,
280}
281
282impl MemFile {
283 pub fn new() -> Box<Self> {
285 Box::default()
286 }
287}
288
289impl Storage for MemFile {
290 fn clone(&self) -> Box<dyn Storage> {
291 Box::new(Self { v: self.v.clone() })
292 }
293}
294
295impl BasicStorage for MemFile {
296 fn size(&self) -> u64 {
297 let v = self.v.lock().unwrap();
298 v.len() as u64
299 }
300
301 fn read(&self, off: u64, bytes: &mut [u8]) {
302 let off = off as usize;
303 let len = bytes.len();
304 let mut v = self.v.lock().unwrap();
305 if off + len > v.len() {
306 v.resize(off + len, 0);
307 }
308 bytes.copy_from_slice(&v[off..off + len]);
309 }
310
311 fn write(&mut self, off: u64, bytes: &[u8]) {
312 let off = off as usize;
313 let len = bytes.len();
314 let mut v = self.v.lock().unwrap();
315 if off + len > v.len() {
316 v.resize(off + len, 0);
317 }
318 v[off..off + len].copy_from_slice(bytes);
319 }
320
321 fn commit(&mut self, size: u64) {
322 let mut v = self.v.lock().unwrap();
323 v.resize(size as usize, 0);
324 }
325}
326
327use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
328
329struct FileInner {
330 f: fs::File,
331}
332
333impl FileInner {
334 pub fn new(filename: &str) -> Self {
336 Self {
337 f: OpenOptions::new()
338 .read(true)
339 .write(true)
340 .create(true)
341 .truncate(false)
342 .open(filename)
343 .unwrap(),
344 }
345 }
346
347 fn size(&mut self) -> u64 {
348 self.f.seek(SeekFrom::End(0)).unwrap()
349 }
350
351 fn read(&mut self, off: u64, bytes: &mut [u8]) {
352 self.f.seek(SeekFrom::Start(off)).unwrap();
353 let _ = self.f.read(bytes).unwrap();
354 }
355
356 fn write(&mut self, off: u64, bytes: &[u8]) {
357 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
359 {
360 let size = self.f.seek(SeekFrom::End(0)).unwrap();
361 if off > size {
362 self.f.set_len(off).unwrap();
363 }
364 }
365 self.f.seek(SeekFrom::Start(off)).unwrap();
366 let _ = self.f.write(bytes).unwrap();
367 }
368
369 fn commit(&mut self, size: u64) {
370 self.f.set_len(size).unwrap();
371 self.f.sync_all().unwrap();
372 }
373}
374
375pub struct UpdFileStorage {
377 file: Cell<Option<FileInner>>,
378}
379
380impl UpdFileStorage {
381 pub fn new(filename: &str) -> Box<Self> {
383 Box::new(Self {
384 file: Cell::new(Some(FileInner::new(filename))),
385 })
386 }
387}
388
389impl BasicStorage for UpdFileStorage {
390 fn size(&self) -> u64 {
391 let mut f = self.file.take().unwrap();
392 let result = f.size();
393 self.file.set(Some(f));
394 result
395 }
396 fn read(&self, off: u64, bytes: &mut [u8]) {
397 let mut f = self.file.take().unwrap();
398 f.read(off, bytes);
399 self.file.set(Some(f));
400 }
401
402 fn write(&mut self, off: u64, bytes: &[u8]) {
403 let mut f = self.file.take().unwrap();
404 f.write(off, bytes);
405 self.file.set(Some(f));
406 }
407
408 fn commit(&mut self, size: u64) {
409 let mut f = self.file.take().unwrap();
410 f.commit(size);
411 self.file.set(Some(f));
412 }
413}
414
415pub struct SimpleFileStorage {
417 file: Arc<Mutex<FileInner>>,
418}
419
420impl SimpleFileStorage {
421 pub fn new(filename: &str) -> Box<Self> {
423 Box::new(Self {
424 file: Arc::new(Mutex::new(FileInner::new(filename))),
425 })
426 }
427}
428
429impl Storage for SimpleFileStorage {
430 fn clone(&self) -> Box<dyn Storage> {
431 Box::new(Self {
432 file: self.file.clone(),
433 })
434 }
435}
436
437impl BasicStorage for SimpleFileStorage {
438 fn size(&self) -> u64 {
439 self.file.lock().unwrap().size()
440 }
441
442 fn read(&self, off: u64, bytes: &mut [u8]) {
443 self.file.lock().unwrap().read(off, bytes);
444 }
445
446 fn write(&mut self, off: u64, bytes: &[u8]) {
447 self.file.lock().unwrap().write(off, bytes);
448 }
449
450 fn commit(&mut self, size: u64) {
451 self.file.lock().unwrap().commit(size);
452 }
453}
454
455pub struct AnyFileStorage {
457 filename: String,
458 files: Arc<Mutex<Vec<FileInner>>>,
459}
460
461impl AnyFileStorage {
462 pub fn new(filename: &str) -> Box<Self> {
464 Box::new(Self {
465 filename: filename.to_owned(),
466 files: Arc::new(Mutex::new(Vec::new())),
467 })
468 }
469
470 fn get_file(&self) -> FileInner {
471 match self.files.lock().unwrap().pop() {
472 Some(f) => f,
473 _ => FileInner::new(&self.filename),
474 }
475 }
476
477 fn put_file(&self, f: FileInner) {
478 self.files.lock().unwrap().push(f);
479 }
480}
481
482impl Storage for AnyFileStorage {
483 fn clone(&self) -> Box<dyn Storage> {
484 Box::new(Self {
485 filename: self.filename.clone(),
486 files: self.files.clone(),
487 })
488 }
489}
490
491impl BasicStorage for AnyFileStorage {
492 fn size(&self) -> u64 {
493 let mut f = self.get_file();
494 let result = f.size();
495 self.put_file(f);
496 result
497 }
498
499 fn read(&self, off: u64, bytes: &mut [u8]) {
500 let mut f = self.get_file();
501 f.read(off, bytes);
502 self.put_file(f);
503 }
504
505 fn write(&mut self, off: u64, bytes: &[u8]) {
506 let mut f = self.get_file();
507 f.write(off, bytes);
508 self.put_file(f);
509 }
510
511 fn commit(&mut self, size: u64) {
512 let mut f = self.get_file();
513 f.commit(size);
514 self.put_file(f);
515 }
516}
517
518pub struct DummyFile {}
520impl DummyFile {
521 pub fn new() -> Box<Self> {
523 Box::new(Self {})
524 }
525}
526
527impl Storage for DummyFile {
528 fn clone(&self) -> Box<dyn Storage> {
529 Self::new()
530 }
531}
532
533impl BasicStorage for DummyFile {
534 fn size(&self) -> u64 {
535 0
536 }
537
538 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
539
540 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
541
542 fn commit(&mut self, _size: u64) {}
543}
544
545#[non_exhaustive]
547pub struct Limits {
548 pub map_lim: usize,
550 pub rbuf_mem: usize,
552 pub swbuf: usize,
554 pub uwbuf: usize,
556}
557
558impl Default for Limits {
559 fn default() -> Self {
560 Self {
561 map_lim: 5000,
562 rbuf_mem: 0x200000,
563 swbuf: 0x100000,
564 uwbuf: 0x100000,
565 }
566 }
567}
568
569struct WriteBuffer {
571 ix: usize,
573 pos: u64,
575 pub stg: Box<dyn BasicStorage>,
577 buf: Vec<u8>,
579}
580
581impl WriteBuffer {
582 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
584 Self {
585 ix: 0,
586 pos: u64::MAX,
587 stg,
588 buf: vec![0; buf_size],
589 }
590 }
591
592 pub fn write(&mut self, off: u64, data: &[u8]) {
594 if self.pos + self.ix as u64 != off {
595 self.flush(off);
596 }
597 let mut done: usize = 0;
598 let mut todo: usize = data.len();
599 while todo > 0 {
600 let mut n: usize = self.buf.len() - self.ix;
601 if n == 0 {
602 self.flush(off + done as u64);
603 n = self.buf.len();
604 }
605 if n > todo {
606 n = todo;
607 }
608 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
609 todo -= n;
610 done += n;
611 self.ix += n;
612 }
613 }
614
615 fn flush(&mut self, new_pos: u64) {
616 if self.ix > 0 {
617 self.stg.write(self.pos, &self.buf[0..self.ix]);
618 }
619 self.ix = 0;
620 self.pos = new_pos;
621 }
622
623 pub fn commit(&mut self, size: u64) {
625 self.flush(u64::MAX);
626 self.stg.commit(size);
627 }
628
629 pub fn write_u64(&mut self, start: u64, value: u64) {
631 self.write(start, &value.to_le_bytes());
632 }
633}
634
635struct ReadBufStg<const N: usize> {
641 stg: Box<dyn Storage>,
643 buf: Mutex<ReadBuffer<N>>,
645 limit: usize,
647}
648
649impl<const N: usize> Drop for ReadBufStg<N> {
650 fn drop(&mut self) {
651 self.reset();
652 }
653}
654
655impl<const N: usize> ReadBufStg<N> {
656 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
658 Self {
659 stg,
660 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
661 limit,
662 }
663 }
664
665 fn reset(&mut self) {
667 self.buf.lock().unwrap().reset();
668 }
669}
670
671impl<const N: usize> BasicStorage for ReadBufStg<N> {
672 fn read(&self, start: u64, data: &mut [u8]) {
674 if data.len() <= self.limit {
675 self.buf.lock().unwrap().read(&*self.stg, start, data);
676 } else {
677 self.stg.read(start, data);
678 }
679 }
680
681 fn size(&self) -> u64 {
683 panic!()
684 }
685
686 fn write(&mut self, _start: u64, _data: &[u8]) {
688 panic!();
689 }
690
691 fn commit(&mut self, _size: u64) {
693 panic!();
694 }
695}
696
697struct ReadBuffer<const N: usize> {
698 map: HashMap<u64, Box<[u8; N]>>,
700 max_buf: usize,
702}
703
704impl<const N: usize> ReadBuffer<N> {
705 fn new(max_buf: usize) -> Self {
706 Self {
707 map: HashMap::default(),
708 max_buf,
709 }
710 }
711
712 fn reset(&mut self) {
713 self.map.clear();
714 }
715
716 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
717 let mut done = 0;
718 while done < data.len() {
719 let off = off + done as u64;
720 let sector = off / N as u64;
721 let disp = (off % N as u64) as usize;
722 let amount = min(data.len() - done, N - disp);
723
724 let p = self.map.entry(sector).or_insert_with(|| {
725 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
726 stg.read(sector * N as u64, &mut *p);
727 p
728 });
729 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
730 done += amount;
731 }
732 if self.map.len() >= self.max_buf {
733 self.reset();
734 }
735 }
736}
737
738#[derive(Default)]
739struct DataSlice {
741 pub data: Data,
743 pub off: usize,
745 pub len: usize,
747}
748
749impl DataSlice {
750 pub fn all(&self) -> &[u8] {
752 &self.data[self.off..self.off + self.len]
753 }
754 pub fn part(&self, off: usize, len: usize) -> &[u8] {
756 &self.data[self.off + off..self.off + off + len]
757 }
758 pub fn trim(&mut self, trim: usize) {
760 self.off += trim;
761 self.len -= trim;
762 }
763 #[allow(dead_code)]
765 pub fn take(&mut self) -> Data {
766 std::mem::take(&mut self.data)
767 }
768}
769
770#[derive(Default)]
771struct WMap {
773 map: BTreeMap<u64, DataSlice>,
775}
776
777impl WMap {
778 pub fn is_empty(&self) -> bool {
780 self.map.is_empty()
781 }
782
783 pub fn len(&self) -> usize {
785 self.map.len()
786 }
787
788 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
790 let map = std::mem::take(&mut self.map);
791 let mut result = GVec::with_capacity(map.len());
792 for (end, v) in map {
793 let start = end - v.len as u64;
794 result.push((start, v));
795 }
796 result
797 }
798
799 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
801 for (end, v) in self.map.iter() {
802 let start = end - v.len as u64;
803 stg.write_data(start, v.data.clone(), v.off, v.len);
804 }
805 }
806
807 #[cfg(not(feature = "pstd"))]
808 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
810 if len != 0 {
811 let (mut insert, mut remove) = (Vec::new(), Vec::new());
812 let end = start + len as u64;
813 for (ee, v) in self.map.range_mut(start + 1..) {
814 let ee = *ee;
815 let es = ee - v.len as u64; if es >= end {
817 break;
819 } else if start <= es {
820 if end < ee {
821 v.trim((end - es) as usize);
823 break;
824 }
825 remove.push(ee);
827 } else if end < ee {
828 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
831 v.trim((end - es) as usize);
832 break;
833 } else {
834 insert.push((es, v.take(), v.off, (start - es) as usize));
837 remove.push(ee);
838 }
839 }
840 for end in remove {
841 self.map.remove(&end);
842 }
843 for (start, data, off, len) in insert {
844 self.map
845 .insert(start + len as u64, DataSlice { data, off, len });
846 }
847 self.map
848 .insert(start + len as u64, DataSlice { data, off, len });
849 }
850 }
851
852 #[cfg(feature = "pstd")]
853 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
855 if len != 0 {
856 let end = start + len as u64;
857 let mut c = self
858 .map
859 .lower_bound_mut(std::ops::Bound::Excluded(&start))
860 .with_mutable_key();
861 while let Some((eend, v)) = c.next() {
862 let ee = *eend;
863 let es = ee - v.len as u64; if es >= end {
865 c.prev();
867 break;
868 } else if start <= es {
869 if end < ee {
870 v.trim((end - es) as usize);
872 c.prev();
873 break;
874 }
875 c.remove_prev();
877 } else if end < ee {
878 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
881 v.trim((end - es) as usize);
882 c.prev();
883 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
884 break;
885 } else {
886 v.len = (start - es) as usize;
889 *eend = es + v.len as u64;
890 }
891 }
892 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
894 }
895 }
896
897 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
899 let len = data.len();
900 if len != 0 {
901 let mut done = 0;
902 for (&end, v) in self.map.range(start + 1..) {
903 let es = end - v.len as u64; let doff = start + done as u64;
905 if es > doff {
906 let a = min(len - done, (es - doff) as usize);
908 u.read(doff, &mut data[done..done + a]);
909 done += a;
910 if done == len {
911 return;
912 }
913 }
914 let skip = (start + done as u64 - es) as usize;
916 let a = min(len - done, v.len - skip);
917 data[done..done + a].copy_from_slice(v.part(skip, a));
918 done += a;
919 if done == len {
920 return;
921 }
922 }
923 u.read(start + done as u64, &mut data[done..]);
924 }
925 }
926}
927
928pub struct BasicAtomicFile {
930 stg: WriteBuffer,
932 upd: WriteBuffer,
934 map: WMap,
936 list: GVec<(u64, DataSlice)>,
938 size: u64,
939 stop: bool,
940}
941
942impl BasicAtomicFile {
943 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
945 let size = stg.size();
946 let mut result = Box::new(Self {
947 stg: WriteBuffer::new(stg, lim.swbuf),
948 upd: WriteBuffer::new(upd, lim.uwbuf),
949 map: WMap::default(),
950 list: GVec::new(),
951 size,
952 stop: false,
953 });
954 result.init();
955 result
956 }
957
958 fn init(&mut self) {
960 let end = self.upd.stg.read_u64(0);
961 let size = self.upd.stg.read_u64(8);
962 if end == 0 {
963 return;
964 }
965 assert!(end == self.upd.stg.size());
966 let mut pos = 16;
967 while pos < end {
968 let start = self.upd.stg.read_u64(pos);
969 pos += 8;
970 let len = self.upd.stg.read_u64(pos);
971 pos += 8;
972 let mut buf: GVec<u8> = gvec![0; len as usize];
973 self.upd.stg.read(pos, &mut buf);
974 pos += len;
975 self.stg.write(start, &buf);
976 }
977 self.stg.commit(size);
978 self.upd.commit(0);
979 }
980
981 pub fn commit_phase(&mut self, size: u64, phase: u8) {
983 if self.map.is_empty() && self.list.is_empty() {
984 return;
985 }
986 if phase == 1 {
987 self.list = self.map.convert_to_vec();
988
989 self.upd.write_u64(0, 0);
992 self.upd.write_u64(8, size);
993 self.upd.commit(16); let mut stg_written = false;
997 let mut pos: u64 = 16;
998 for (start, v) in self.list.iter() {
999 let (start, len, data) = (*start, v.len as u64, v.all());
1000 if start >= self.size {
1001 stg_written = true;
1003 self.stg.write(start, data);
1004 } else {
1005 self.upd.write_u64(pos, start);
1006 pos += 8;
1007 self.upd.write_u64(pos, len);
1008 pos += 8;
1009 self.upd.write(pos, data);
1010 pos += len;
1011 }
1012 }
1013 if stg_written {
1014 self.stg.commit(size);
1015 }
1016 self.upd.commit(pos); self.upd.write_u64(0, pos);
1020 self.upd.write_u64(8, size);
1021 self.upd.commit(pos);
1022 } else {
1023 for (start, v) in self.list.iter() {
1024 if *start < self.size {
1025 self.stg.write(*start, v.all());
1027 }
1028 }
1029 self.list = GVec::new();
1030 self.stg.commit(size);
1031 self.upd.commit(0);
1032 }
1033 }
1034}
1035
1036impl BasicStorage for BasicAtomicFile {
1037 fn commit(&mut self, size: u64) {
1038 if self.stop { return; }
1039 self.commit_phase(size, 1);
1040 self.commit_phase(size, 2);
1041 self.size = size;
1042 }
1043
1044 fn size(&self) -> u64 {
1045 self.size
1046 }
1047
1048 fn read(&self, start: u64, data: &mut [u8]) {
1049 self.map.read(start, data, &*self.stg.stg);
1050 }
1051
1052 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1053 self.map.write(start, data, off, len);
1054 }
1055
1056 fn write(&mut self, start: u64, data: &[u8]) {
1057 let len = data.len();
1058 let d = Arc::new(data.to_vec());
1059 self.write_data(start, d, 0, len);
1060 }
1061
1062 fn shutdown(&mut self)
1063 {
1064 self.stop = true;
1065 }
1066}
1067
1068#[cfg(target_family = "unix")]
1070pub struct UnixFileStorage {
1071 size: Arc<Mutex<u64>>,
1072 f: fs::File,
1073}
1074#[cfg(target_family = "unix")]
1075impl UnixFileStorage {
1076 pub fn new(filename: &str) -> Box<Self> {
1078 let mut f = OpenOptions::new()
1079 .read(true)
1080 .write(true)
1081 .create(true)
1082 .truncate(false)
1083 .open(filename)
1084 .unwrap();
1085 let size = f.seek(SeekFrom::End(0)).unwrap();
1086 let size = Arc::new(Mutex::new(size));
1087 Box::new(Self { size, f })
1088 }
1089}
1090
1091#[cfg(target_family = "unix")]
1092impl Storage for UnixFileStorage {
1093 fn clone(&self) -> Box<dyn Storage> {
1094 Box::new(Self {
1095 size: self.size.clone(),
1096 f: self.f.try_clone().unwrap(),
1097 })
1098 }
1099}
1100
1101#[cfg(target_family = "unix")]
1102use std::os::unix::fs::FileExt;
1103
1104#[cfg(target_family = "unix")]
1105impl BasicStorage for UnixFileStorage {
1106 fn read(&self, start: u64, data: &mut [u8]) {
1107 let _ = self.f.read_at(data, start);
1108 }
1109
1110 fn write(&mut self, start: u64, data: &[u8]) {
1111 let _ = self.f.write_at(data, start);
1112 }
1113
1114 fn size(&self) -> u64 {
1115 *self.size.lock().unwrap()
1116 }
1117
1118 fn commit(&mut self, size: u64) {
1119 *self.size.lock().unwrap() = size;
1120 self.f.set_len(size).unwrap();
1121 self.f.sync_all().unwrap();
1122 }
1123}
1124
1125#[cfg(target_family = "windows")]
1127pub struct WindowsFileStorage {
1128 size: Arc<Mutex<u64>>,
1129 f: fs::File,
1130}
1131#[cfg(target_family = "windows")]
1132impl WindowsFileStorage {
1133 pub fn new(filename: &str) -> Box<Self> {
1135 let mut f = OpenOptions::new()
1136 .read(true)
1137 .write(true)
1138 .create(true)
1139 .truncate(false)
1140 .open(filename)
1141 .unwrap();
1142 let size = f.seek(SeekFrom::End(0)).unwrap();
1143 let size = Arc::new(Mutex::new(size));
1144 Box::new(Self { size, f })
1145 }
1146}
1147
1148#[cfg(target_family = "windows")]
1149impl Storage for WindowsFileStorage {
1150 fn clone(&self) -> Box<dyn Storage> {
1151 Box::new(Self {
1152 size: self.size.clone(),
1153 f: self.f.try_clone().unwrap(),
1154 })
1155 }
1156}
1157
1158#[cfg(target_family = "windows")]
1159use std::os::windows::fs::FileExt;
1160
1161#[cfg(target_family = "windows")]
1162impl BasicStorage for WindowsFileStorage {
1163 fn read(&self, start: u64, data: &mut [u8]) {
1164 let _ = self.f.seek_read(data, start);
1165 }
1166
1167 fn write(&mut self, start: u64, data: &[u8]) {
1168 let _ = self.f.seek_write(data, start);
1169 }
1170
1171 fn size(&self) -> u64 {
1172 *self.size.lock().unwrap()
1173 }
1174
1175 fn commit(&mut self, size: u64) {
1176 *self.size.lock().unwrap() = size;
1177 self.f.set_len(size).unwrap();
1178 self.f.sync_all().unwrap();
1179 }
1180}
1181
1182#[cfg(target_family = "windows")]
1184pub type MultiFileStorage = WindowsFileStorage;
1185
1186#[cfg(target_family = "unix")]
1188pub type MultiFileStorage = UnixFileStorage;
1189
1190#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1192pub type MultiFileStorage = AnyFileStorage;
1193
1194#[cfg(any(target_family = "windows", target_family = "unix"))]
1196pub type FastFileStorage = MultiFileStorage;
1197
1198#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1200pub type FastFileStorage = UpdFileStorage;
1201
1202#[cfg(test)]
1203fn test_amount() -> usize {
1205 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1206}
1207
1208#[test]
1209fn test_atomic_file() {
1210 use rand::Rng;
1211 let ta = test_amount();
1214 println!(" Test amount={}", ta);
1215
1216 let mut rng = rand::thread_rng();
1217
1218 for _ in 0..100 {
1219 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1220 let mut s2 = MemFile::new();
1222
1223 for _ in 0..1000 * ta {
1224 let off: usize = rng.r#gen::<usize>() % 100;
1225 let mut len = 1 + rng.r#gen::<usize>() % 20;
1226 let w: bool = rng.r#gen();
1227 if w {
1228 let mut bytes = Vec::new();
1229 while len > 0 {
1230 len -= 1;
1231 let b: u8 = rng.r#gen::<u8>();
1232 bytes.push(b);
1233 }
1234 s1.write(off as u64, &bytes);
1235 s2.write(off as u64, &bytes);
1236 } else {
1237 let mut b2 = vec![0; len];
1238 let mut b3 = vec![0; len];
1239 s1.read(off as u64, &mut b2);
1240 s2.read(off as u64, &mut b3);
1241 assert!(b2 == b3);
1242 }
1243 if rng.r#gen::<usize>() % 50 == 0 {
1244 s1.commit(200);
1245 s2.commit(200);
1246 }
1247 }
1248 }
1249}