1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::GTemp,
27 veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39pub type Data = Arc<Vec<u8>>;
41
42pub struct AtomicFile {
59 map: WMap,
61 cf: Arc<RwLock<CommitFile>>,
63 size: u64,
65 tx: std::sync::mpsc::Sender<(u64, WMap)>,
67 busy: Arc<Mutex<()>>,
69 map_lim: usize,
71}
72
73impl AtomicFile {
74 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76 Self::new_with_limits(stg, upd, &Limits::default())
77 }
78
79 pub fn new_with_limits(
81 stg: Box<dyn Storage>,
82 upd: Box<dyn BasicStorage>,
83 lim: &Limits,
84 ) -> Box<Self> {
85 let size = stg.size();
86 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
94
95 std::thread::spawn(move || {
96 while let Ok((size, map)) = rx.recv() {
98 let _lock = busy1.lock();
99 baf.map = map;
100 baf.commit(size);
101 cf1.write().unwrap().done_one();
102 }
103 });
104 Box::new(Self {
105 map: WMap::default(),
106 cf,
107 size,
108 tx,
109 busy,
110 map_lim: lim.map_lim,
111 })
112 }
113}
114
115impl Storage for AtomicFile {
116 fn clone(&self) -> Box<dyn Storage> {
117 panic!()
118 }
119}
120
121impl BasicStorage for AtomicFile {
122 fn commit(&mut self, size: u64) {
123 self.size = size;
124 if self.map.is_empty() {
125 return;
126 }
127 if self.cf.read().unwrap().map.len() > self.map_lim {
128 self.wait_complete();
129 }
130 let map = std::mem::take(&mut self.map);
131 let stop =
132 {
133 let cf = &mut *self.cf.write().unwrap();
134 if cf.stop { true }
135 else
136 {
137 cf.todo += 1;
138 map.to_storage(cf);
140 self.tx.send((size, map)).unwrap();
142 false
143 }
144 };
145 if stop {
146 loop { std::thread::sleep(std::time::Duration::from_millis(100)); }
148 }
149 }
150
151 fn size(&self) -> u64 {
152 self.size
153 }
154
155 fn read(&self, start: u64, data: &mut [u8]) {
156 self.map.read(start, data, &*self.cf.read().unwrap());
157 }
158
159 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160 self.map.write(start, data, off, len);
161 }
162
163 fn write(&mut self, start: u64, data: &[u8]) {
164 let len = data.len();
165 let d = Arc::new(data.to_vec());
166 self.write_data(start, d, 0, len);
167 }
168
169 fn wait_complete(&self) {
170 while self.cf.read().unwrap().todo != 0 {
171 std::thread::yield_now();
172 let _x = self.busy.lock();
173 }
174 }
175
176 fn shutdown(&mut self) {
177 self.cf.write().unwrap().stop = true; self.wait_complete();
179 }
180}
181
182struct CommitFile {
183 stg: ReadBufStg<256>,
185 map: WMap,
187 todo: usize,
189 stop: bool,
191}
192
193impl CommitFile {
194 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
195 Self {
196 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
197 map: WMap::default(),
198 todo: 0,
199 stop: false,
200 }
201 }
202
203 fn done_one(&mut self) {
204 self.todo -= 1;
205 if self.todo == 0 {
206 self.map = WMap::default();
207 self.stg.reset();
208 }
209 }
210}
211
212impl BasicStorage for CommitFile {
213 fn commit(&mut self, _size: u64) {
214 panic!()
215 }
216
217 fn size(&self) -> u64 {
218 panic!()
219 }
220
221 fn read(&self, start: u64, data: &mut [u8]) {
222 self.map.read(start, data, &self.stg);
223 }
224
225 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
226 self.map.write(start, data, off, len);
227 }
228
229 fn write(&mut self, _start: u64, _data: &[u8]) {
230 panic!()
231 }
232}
233
234pub trait BasicStorage: Send {
238 fn size(&self) -> u64;
241
242 fn read(&self, start: u64, data: &mut [u8]);
244
245 fn write(&mut self, start: u64, data: &[u8]);
247
248 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
250 let len = data.len();
251 let d = Arc::new(data);
252 self.write_data(start, d, 0, len);
253 }
254
255 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
257 self.write(start, &data[off..off + len]);
258 }
259
260 fn commit(&mut self, size: u64);
262
263 fn write_u64(&mut self, start: u64, value: u64) {
265 self.write(start, &value.to_le_bytes());
266 }
267
268 fn read_u64(&self, start: u64) -> u64 {
270 let mut bytes = [0; 8];
271 self.read(start, &mut bytes);
272 u64::from_le_bytes(bytes)
273 }
274
275 fn wait_complete(&self){}
277
278 fn shutdown(&mut self){}
280}
281
282pub trait Storage: BasicStorage + Sync {
284 fn clone(&self) -> Box<dyn Storage>;
286}
287
288#[derive(Default)]
290pub struct MemFile {
291 v: Arc<Mutex<Vec<u8>>>,
292}
293
294impl MemFile {
295 pub fn new() -> Box<Self> {
297 Box::default()
298 }
299}
300
301impl Storage for MemFile {
302 fn clone(&self) -> Box<dyn Storage> {
303 Box::new(Self { v: self.v.clone() })
304 }
305}
306
307impl BasicStorage for MemFile {
308 fn size(&self) -> u64 {
309 let v = self.v.lock().unwrap();
310 v.len() as u64
311 }
312
313 fn read(&self, off: u64, bytes: &mut [u8]) {
314 let off = off as usize;
315 let len = bytes.len();
316 let mut v = self.v.lock().unwrap();
317 if off + len > v.len() {
318 v.resize(off + len, 0);
319 }
320 bytes.copy_from_slice(&v[off..off + len]);
321 }
322
323 fn write(&mut self, off: u64, bytes: &[u8]) {
324 let off = off as usize;
325 let len = bytes.len();
326 let mut v = self.v.lock().unwrap();
327 if off + len > v.len() {
328 v.resize(off + len, 0);
329 }
330 v[off..off + len].copy_from_slice(bytes);
331 }
332
333 fn commit(&mut self, size: u64) {
334 let mut v = self.v.lock().unwrap();
335 v.resize(size as usize, 0);
336 }
337}
338
339use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
340
341struct FileInner {
342 f: fs::File,
343}
344
345impl FileInner {
346 pub fn new(filename: &str) -> Self {
348 Self {
349 f: OpenOptions::new()
350 .read(true)
351 .write(true)
352 .create(true)
353 .truncate(false)
354 .open(filename)
355 .unwrap(),
356 }
357 }
358
359 fn size(&mut self) -> u64 {
360 self.f.seek(SeekFrom::End(0)).unwrap()
361 }
362
363 fn read(&mut self, off: u64, bytes: &mut [u8]) {
364 self.f.seek(SeekFrom::Start(off)).unwrap();
365 let _ = self.f.read(bytes).unwrap();
366 }
367
368 fn write(&mut self, off: u64, bytes: &[u8]) {
369 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
371 {
372 let size = self.f.seek(SeekFrom::End(0)).unwrap();
373 if off > size {
374 self.f.set_len(off).unwrap();
375 }
376 }
377 self.f.seek(SeekFrom::Start(off)).unwrap();
378 let _ = self.f.write(bytes).unwrap();
379 }
380
381 fn commit(&mut self, size: u64) {
382 self.f.set_len(size).unwrap();
383 self.f.sync_all().unwrap();
384 }
385}
386
387pub struct UpdFileStorage {
389 file: Cell<Option<FileInner>>,
390}
391
392impl UpdFileStorage {
393 pub fn new(filename: &str) -> Box<Self> {
395 Box::new(Self {
396 file: Cell::new(Some(FileInner::new(filename))),
397 })
398 }
399}
400
401impl BasicStorage for UpdFileStorage {
402 fn size(&self) -> u64 {
403 let mut f = self.file.take().unwrap();
404 let result = f.size();
405 self.file.set(Some(f));
406 result
407 }
408 fn read(&self, off: u64, bytes: &mut [u8]) {
409 let mut f = self.file.take().unwrap();
410 f.read(off, bytes);
411 self.file.set(Some(f));
412 }
413
414 fn write(&mut self, off: u64, bytes: &[u8]) {
415 let mut f = self.file.take().unwrap();
416 f.write(off, bytes);
417 self.file.set(Some(f));
418 }
419
420 fn commit(&mut self, size: u64) {
421 let mut f = self.file.take().unwrap();
422 f.commit(size);
423 self.file.set(Some(f));
424 }
425}
426
427pub struct SimpleFileStorage {
429 file: Arc<Mutex<FileInner>>,
430}
431
432impl SimpleFileStorage {
433 pub fn new(filename: &str) -> Box<Self> {
435 Box::new(Self {
436 file: Arc::new(Mutex::new(FileInner::new(filename))),
437 })
438 }
439}
440
441impl Storage for SimpleFileStorage {
442 fn clone(&self) -> Box<dyn Storage> {
443 Box::new(Self {
444 file: self.file.clone(),
445 })
446 }
447}
448
449impl BasicStorage for SimpleFileStorage {
450 fn size(&self) -> u64 {
451 self.file.lock().unwrap().size()
452 }
453
454 fn read(&self, off: u64, bytes: &mut [u8]) {
455 self.file.lock().unwrap().read(off, bytes);
456 }
457
458 fn write(&mut self, off: u64, bytes: &[u8]) {
459 self.file.lock().unwrap().write(off, bytes);
460 }
461
462 fn commit(&mut self, size: u64) {
463 self.file.lock().unwrap().commit(size);
464 }
465}
466
467pub struct AnyFileStorage {
469 filename: String,
470 files: Arc<Mutex<Vec<FileInner>>>,
471}
472
473impl AnyFileStorage {
474 pub fn new(filename: &str) -> Box<Self> {
476 Box::new(Self {
477 filename: filename.to_owned(),
478 files: Arc::new(Mutex::new(Vec::new())),
479 })
480 }
481
482 fn get_file(&self) -> FileInner {
483 match self.files.lock().unwrap().pop() {
484 Some(f) => f,
485 _ => FileInner::new(&self.filename),
486 }
487 }
488
489 fn put_file(&self, f: FileInner) {
490 self.files.lock().unwrap().push(f);
491 }
492}
493
494impl Storage for AnyFileStorage {
495 fn clone(&self) -> Box<dyn Storage> {
496 Box::new(Self {
497 filename: self.filename.clone(),
498 files: self.files.clone(),
499 })
500 }
501}
502
503impl BasicStorage for AnyFileStorage {
504 fn size(&self) -> u64 {
505 let mut f = self.get_file();
506 let result = f.size();
507 self.put_file(f);
508 result
509 }
510
511 fn read(&self, off: u64, bytes: &mut [u8]) {
512 let mut f = self.get_file();
513 f.read(off, bytes);
514 self.put_file(f);
515 }
516
517 fn write(&mut self, off: u64, bytes: &[u8]) {
518 let mut f = self.get_file();
519 f.write(off, bytes);
520 self.put_file(f);
521 }
522
523 fn commit(&mut self, size: u64) {
524 let mut f = self.get_file();
525 f.commit(size);
526 self.put_file(f);
527 }
528}
529
530pub struct DummyFile {}
532impl DummyFile {
533 pub fn new() -> Box<Self> {
535 Box::new(Self {})
536 }
537}
538
539impl Storage for DummyFile {
540 fn clone(&self) -> Box<dyn Storage> {
541 Self::new()
542 }
543}
544
545impl BasicStorage for DummyFile {
546 fn size(&self) -> u64 {
547 0
548 }
549
550 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
551
552 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
553
554 fn commit(&mut self, _size: u64) {}
555}
556
557#[non_exhaustive]
559pub struct Limits {
560 pub map_lim: usize,
562 pub rbuf_mem: usize,
564 pub swbuf: usize,
566 pub uwbuf: usize,
568}
569
570impl Default for Limits {
571 fn default() -> Self {
572 Self {
573 map_lim: 5000,
574 rbuf_mem: 0x200000,
575 swbuf: 0x100000,
576 uwbuf: 0x100000,
577 }
578 }
579}
580
581struct WriteBuffer {
583 ix: usize,
585 pos: u64,
587 pub stg: Box<dyn BasicStorage>,
589 buf: Vec<u8>,
591}
592
593impl WriteBuffer {
594 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
596 Self {
597 ix: 0,
598 pos: u64::MAX,
599 stg,
600 buf: vec![0; buf_size],
601 }
602 }
603
604 pub fn write(&mut self, off: u64, data: &[u8]) {
606 if self.pos + self.ix as u64 != off {
607 self.flush(off);
608 }
609 let mut done: usize = 0;
610 let mut todo: usize = data.len();
611 while todo > 0 {
612 let mut n: usize = self.buf.len() - self.ix;
613 if n == 0 {
614 self.flush(off + done as u64);
615 n = self.buf.len();
616 }
617 if n > todo {
618 n = todo;
619 }
620 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
621 todo -= n;
622 done += n;
623 self.ix += n;
624 }
625 }
626
627 fn flush(&mut self, new_pos: u64) {
628 if self.ix > 0 {
629 self.stg.write(self.pos, &self.buf[0..self.ix]);
630 }
631 self.ix = 0;
632 self.pos = new_pos;
633 }
634
635 pub fn commit(&mut self, size: u64) {
637 self.flush(u64::MAX);
638 self.stg.commit(size);
639 }
640
641 pub fn write_u64(&mut self, start: u64, value: u64) {
643 self.write(start, &value.to_le_bytes());
644 }
645}
646
647struct ReadBufStg<const N: usize> {
653 stg: Box<dyn Storage>,
655 buf: Mutex<ReadBuffer<N>>,
657 limit: usize,
659}
660
661impl<const N: usize> Drop for ReadBufStg<N> {
662 fn drop(&mut self) {
663 self.reset();
664 }
665}
666
667impl<const N: usize> ReadBufStg<N> {
668 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
670 Self {
671 stg,
672 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
673 limit,
674 }
675 }
676
677 fn reset(&mut self) {
679 self.buf.lock().unwrap().reset();
680 }
681}
682
683impl<const N: usize> BasicStorage for ReadBufStg<N> {
684 fn read(&self, start: u64, data: &mut [u8]) {
686 if data.len() <= self.limit {
687 self.buf.lock().unwrap().read(&*self.stg, start, data);
688 } else {
689 self.stg.read(start, data);
690 }
691 }
692
693 fn size(&self) -> u64 {
695 panic!()
696 }
697
698 fn write(&mut self, _start: u64, _data: &[u8]) {
700 panic!();
701 }
702
703 fn commit(&mut self, _size: u64) {
705 panic!();
706 }
707}
708
709struct ReadBuffer<const N: usize> {
710 map: HashMap<u64, Box<[u8; N]>>,
712 max_buf: usize,
714}
715
716impl<const N: usize> ReadBuffer<N> {
717 fn new(max_buf: usize) -> Self {
718 Self {
719 map: HashMap::default(),
720 max_buf,
721 }
722 }
723
724 fn reset(&mut self) {
725 self.map.clear();
726 }
727
728 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
729 let mut done = 0;
730 while done < data.len() {
731 let off = off + done as u64;
732 let sector = off / N as u64;
733 let disp = (off % N as u64) as usize;
734 let amount = min(data.len() - done, N - disp);
735
736 let p = self.map.entry(sector).or_insert_with(|| {
737 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
738 stg.read(sector * N as u64, &mut *p);
739 p
740 });
741 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
742 done += amount;
743 }
744 if self.map.len() >= self.max_buf {
745 self.reset();
746 }
747 }
748}
749
750#[derive(Default)]
751struct DataSlice {
753 pub data: Data,
755 pub off: usize,
757 pub len: usize,
759}
760
761impl DataSlice {
762 pub fn all(&self) -> &[u8] {
764 &self.data[self.off..self.off + self.len]
765 }
766 pub fn part(&self, off: usize, len: usize) -> &[u8] {
768 &self.data[self.off + off..self.off + off + len]
769 }
770 pub fn trim(&mut self, trim: usize) {
772 self.off += trim;
773 self.len -= trim;
774 }
775 #[allow(dead_code)]
777 pub fn take(&mut self) -> Data {
778 std::mem::take(&mut self.data)
779 }
780}
781
782#[derive(Default)]
783struct WMap {
785 map: BTreeMap<u64, DataSlice>,
787}
788
789impl WMap {
790 pub fn is_empty(&self) -> bool {
792 self.map.is_empty()
793 }
794
795 pub fn len(&self) -> usize {
797 self.map.len()
798 }
799
800 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
802 let map = std::mem::take(&mut self.map);
803 let mut result = GVec::with_capacity(map.len());
804 for (end, v) in map {
805 let start = end - v.len as u64;
806 result.push((start, v));
807 }
808 result
809 }
810
811 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
813 for (end, v) in self.map.iter() {
814 let start = end - v.len as u64;
815 stg.write_data(start, v.data.clone(), v.off, v.len);
816 }
817 }
818
819 #[cfg(not(feature = "pstd"))]
820 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
822 if len != 0 {
823 let (mut insert, mut remove) = (Vec::new(), Vec::new());
824 let end = start + len as u64;
825 for (ee, v) in self.map.range_mut(start + 1..) {
826 let ee = *ee;
827 let es = ee - v.len as u64; if es >= end {
829 break;
831 } else if start <= es {
832 if end < ee {
833 v.trim((end - es) as usize);
835 break;
836 }
837 remove.push(ee);
839 } else if end < ee {
840 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
843 v.trim((end - es) as usize);
844 break;
845 } else {
846 insert.push((es, v.take(), v.off, (start - es) as usize));
849 remove.push(ee);
850 }
851 }
852 for end in remove {
853 self.map.remove(&end);
854 }
855 for (start, data, off, len) in insert {
856 self.map
857 .insert(start + len as u64, DataSlice { data, off, len });
858 }
859 self.map
860 .insert(start + len as u64, DataSlice { data, off, len });
861 }
862 }
863
864 #[cfg(feature = "pstd")]
865 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
867 if len != 0 {
868 let end = start + len as u64;
869 let mut c = self
870 .map
871 .lower_bound_mut(std::ops::Bound::Excluded(&start))
872 .with_mutable_key();
873 while let Some((eend, v)) = c.next() {
874 let ee = *eend;
875 let es = ee - v.len as u64; if es >= end {
877 c.prev();
879 break;
880 } else if start <= es {
881 if end < ee {
882 v.trim((end - es) as usize);
884 c.prev();
885 break;
886 }
887 c.remove_prev();
889 } else if end < ee {
890 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
893 v.trim((end - es) as usize);
894 c.prev();
895 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
896 break;
897 } else {
898 v.len = (start - es) as usize;
901 *eend = es + v.len as u64;
902 }
903 }
904 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
906 }
907 }
908
909 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
911 let len = data.len();
912 if len != 0 {
913 let mut done = 0;
914 for (&end, v) in self.map.range(start + 1..) {
915 let es = end - v.len as u64; let doff = start + done as u64;
917 if es > doff {
918 let a = min(len - done, (es - doff) as usize);
920 u.read(doff, &mut data[done..done + a]);
921 done += a;
922 if done == len {
923 return;
924 }
925 }
926 let skip = (start + done as u64 - es) as usize;
928 let a = min(len - done, v.len - skip);
929 data[done..done + a].copy_from_slice(v.part(skip, a));
930 done += a;
931 if done == len {
932 return;
933 }
934 }
935 u.read(start + done as u64, &mut data[done..]);
936 }
937 }
938}
939
940pub struct BasicAtomicFile {
942 stg: WriteBuffer,
944 upd: WriteBuffer,
946 map: WMap,
948 list: GVec<(u64, DataSlice)>,
950 size: u64,
951 stop: bool,
952}
953
954impl BasicAtomicFile {
955 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
957 let size = stg.size();
958 let mut result = Box::new(Self {
959 stg: WriteBuffer::new(stg, lim.swbuf),
960 upd: WriteBuffer::new(upd, lim.uwbuf),
961 map: WMap::default(),
962 list: GVec::new(),
963 size,
964 stop: false,
965 });
966 result.init();
967 result
968 }
969
970 fn init(&mut self) {
972 let end = self.upd.stg.read_u64(0);
973 let size = self.upd.stg.read_u64(8);
974 if end == 0 {
975 return;
976 }
977 assert!(end == self.upd.stg.size());
978 let mut pos = 16;
979 while pos < end {
980 let start = self.upd.stg.read_u64(pos);
981 pos += 8;
982 let len = self.upd.stg.read_u64(pos);
983 pos += 8;
984 let mut buf: GVec<u8> = gvec![0; len as usize];
985 self.upd.stg.read(pos, &mut buf);
986 pos += len;
987 self.stg.write(start, &buf);
988 }
989 self.stg.commit(size);
990 self.upd.commit(0);
991 }
992
993 pub fn commit_phase(&mut self, size: u64, phase: u8) {
995 if self.map.is_empty() && self.list.is_empty() {
996 return;
997 }
998 if phase == 1 {
999 self.list = self.map.convert_to_vec();
1000
1001 self.upd.write_u64(0, 0);
1004 self.upd.write_u64(8, size);
1005 self.upd.commit(16); let mut stg_written = false;
1009 let mut pos: u64 = 16;
1010 for (start, v) in self.list.iter() {
1011 let (start, len, data) = (*start, v.len as u64, v.all());
1012 if start >= self.size {
1013 stg_written = true;
1015 self.stg.write(start, data);
1016 } else {
1017 self.upd.write_u64(pos, start);
1018 pos += 8;
1019 self.upd.write_u64(pos, len);
1020 pos += 8;
1021 self.upd.write(pos, data);
1022 pos += len;
1023 }
1024 }
1025 if stg_written {
1026 self.stg.commit(size);
1027 }
1028 self.upd.commit(pos); self.upd.write_u64(0, pos);
1032 self.upd.write_u64(8, size);
1033 self.upd.commit(pos);
1034 } else {
1035 for (start, v) in self.list.iter() {
1036 if *start < self.size {
1037 self.stg.write(*start, v.all());
1039 }
1040 }
1041 self.list = GVec::new();
1042 self.stg.commit(size);
1043 self.upd.commit(0);
1044 }
1045 }
1046}
1047
1048impl BasicStorage for BasicAtomicFile {
1049 fn commit(&mut self, size: u64) {
1050 if self.stop { return; }
1051 self.commit_phase(size, 1);
1052 self.commit_phase(size, 2);
1053 self.size = size;
1054 }
1055
1056 fn size(&self) -> u64 {
1057 self.size
1058 }
1059
1060 fn read(&self, start: u64, data: &mut [u8]) {
1061 self.map.read(start, data, &*self.stg.stg);
1062 }
1063
1064 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1065 self.map.write(start, data, off, len);
1066 }
1067
1068 fn write(&mut self, start: u64, data: &[u8]) {
1069 let len = data.len();
1070 let d = Arc::new(data.to_vec());
1071 self.write_data(start, d, 0, len);
1072 }
1073
1074 fn shutdown(&mut self)
1075 {
1076 self.stop = true;
1077 }
1078}
1079
1080#[cfg(target_family = "unix")]
1082pub struct UnixFileStorage {
1083 size: Arc<Mutex<u64>>,
1084 f: fs::File,
1085}
1086#[cfg(target_family = "unix")]
1087impl UnixFileStorage {
1088 pub fn new(filename: &str) -> Box<Self> {
1090 let mut f = OpenOptions::new()
1091 .read(true)
1092 .write(true)
1093 .create(true)
1094 .truncate(false)
1095 .open(filename)
1096 .unwrap();
1097 let size = f.seek(SeekFrom::End(0)).unwrap();
1098 let size = Arc::new(Mutex::new(size));
1099 Box::new(Self { size, f })
1100 }
1101}
1102
1103#[cfg(target_family = "unix")]
1104impl Storage for UnixFileStorage {
1105 fn clone(&self) -> Box<dyn Storage> {
1106 Box::new(Self {
1107 size: self.size.clone(),
1108 f: self.f.try_clone().unwrap(),
1109 })
1110 }
1111}
1112
1113#[cfg(target_family = "unix")]
1114use std::os::unix::fs::FileExt;
1115
1116#[cfg(target_family = "unix")]
1117impl BasicStorage for UnixFileStorage {
1118 fn read(&self, start: u64, data: &mut [u8]) {
1119 let _ = self.f.read_at(data, start);
1120 }
1121
1122 fn write(&mut self, start: u64, data: &[u8]) {
1123 let _ = self.f.write_at(data, start);
1124 }
1125
1126 fn size(&self) -> u64 {
1127 *self.size.lock().unwrap()
1128 }
1129
1130 fn commit(&mut self, size: u64) {
1131 *self.size.lock().unwrap() = size;
1132 self.f.set_len(size).unwrap();
1133 self.f.sync_all().unwrap();
1134 }
1135}
1136
1137#[cfg(target_family = "windows")]
1139pub struct WindowsFileStorage {
1140 size: Arc<Mutex<u64>>,
1141 f: fs::File,
1142}
1143#[cfg(target_family = "windows")]
1144impl WindowsFileStorage {
1145 pub fn new(filename: &str) -> Box<Self> {
1147 let mut f = OpenOptions::new()
1148 .read(true)
1149 .write(true)
1150 .create(true)
1151 .truncate(false)
1152 .open(filename)
1153 .unwrap();
1154 let size = f.seek(SeekFrom::End(0)).unwrap();
1155 let size = Arc::new(Mutex::new(size));
1156 Box::new(Self { size, f })
1157 }
1158}
1159
1160#[cfg(target_family = "windows")]
1161impl Storage for WindowsFileStorage {
1162 fn clone(&self) -> Box<dyn Storage> {
1163 Box::new(Self {
1164 size: self.size.clone(),
1165 f: self.f.try_clone().unwrap(),
1166 })
1167 }
1168}
1169
1170#[cfg(target_family = "windows")]
1171use std::os::windows::fs::FileExt;
1172
1173#[cfg(target_family = "windows")]
1174impl BasicStorage for WindowsFileStorage {
1175 fn read(&self, start: u64, data: &mut [u8]) {
1176 let _ = self.f.seek_read(data, start);
1177 }
1178
1179 fn write(&mut self, start: u64, data: &[u8]) {
1180 let _ = self.f.seek_write(data, start);
1181 }
1182
1183 fn size(&self) -> u64 {
1184 *self.size.lock().unwrap()
1185 }
1186
1187 fn commit(&mut self, size: u64) {
1188 *self.size.lock().unwrap() = size;
1189 self.f.set_len(size).unwrap();
1190 self.f.sync_all().unwrap();
1191 }
1192}
1193
1194#[cfg(target_family = "windows")]
1196pub type MultiFileStorage = WindowsFileStorage;
1197
1198#[cfg(target_family = "unix")]
1200pub type MultiFileStorage = UnixFileStorage;
1201
1202#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1204pub type MultiFileStorage = AnyFileStorage;
1205
1206#[cfg(any(target_family = "windows", target_family = "unix"))]
1208pub type FastFileStorage = MultiFileStorage;
1209
1210#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1212pub type FastFileStorage = UpdFileStorage;
1213
1214#[cfg(test)]
1215fn test_amount() -> usize {
1217 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1218}
1219
1220#[test]
1221fn test_atomic_file() {
1222 use rand::Rng;
1223 let ta = test_amount();
1226 println!(" Test amount={}", ta);
1227
1228 let mut rng = rand::thread_rng();
1229
1230 for _ in 0..100 {
1231 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1232 let mut s2 = MemFile::new();
1234
1235 for _ in 0..1000 * ta {
1236 let off: usize = rng.r#gen::<usize>() % 100;
1237 let mut len = 1 + rng.r#gen::<usize>() % 20;
1238 let w: bool = rng.r#gen();
1239 if w {
1240 let mut bytes = Vec::new();
1241 while len > 0 {
1242 len -= 1;
1243 let b: u8 = rng.r#gen::<u8>();
1244 bytes.push(b);
1245 }
1246 s1.write(off as u64, &bytes);
1247 s2.write(off as u64, &bytes);
1248 } else {
1249 let mut b2 = vec![0; len];
1250 let mut b3 = vec![0; len];
1251 s1.read(off as u64, &mut b2);
1252 s2.read(off as u64, &mut b3);
1253 assert!(b2 == b3);
1254 }
1255 if rng.r#gen::<usize>() % 50 == 0 {
1256 s1.commit(200);
1257 s2.commit(200);
1258 }
1259 }
1260 }
1261}