1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::GTemp,
27 veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39pub type Data = Arc<Vec<u8>>;
41
42pub struct AtomicFile {
59 map: WMap,
61 cf: Arc<RwLock<CommitFile>>,
63 size: u64,
65 tx: std::sync::mpsc::Sender<(u64, WMap)>,
67 busy: Arc<Mutex<()>>,
69 map_lim: usize,
71}
72
73impl AtomicFile {
74 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76 Self::new_with_limits(stg, upd, &Limits::default())
77 }
78
79 pub fn new_with_limits(
81 stg: Box<dyn Storage>,
82 upd: Box<dyn BasicStorage>,
83 lim: &Limits,
84 ) -> Box<Self> {
85 let size = stg.size();
86 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
94
95 std::thread::spawn(move || {
96 while let Ok((size, map)) = rx.recv() {
98 let _lock = busy1.lock();
99 baf.map = map;
100 baf.commit(size);
101 cf1.write().unwrap().done_one();
102 }
103 });
104 Box::new(Self {
105 map: WMap::default(),
106 cf,
107 size,
108 tx,
109 busy,
110 map_lim: lim.map_lim,
111 })
112 }
113}
114
115impl Storage for AtomicFile {
116 fn clone(&self) -> Box<dyn Storage> {
117 panic!()
118 }
119}
120
121impl BasicStorage for AtomicFile {
122 fn commit(&mut self, size: u64) {
123 self.size = size;
124 if self.map.is_empty() {
125 return;
126 }
127 if self.cf.read().unwrap().map.len() > self.map_lim {
128 self.wait_complete();
129 }
130 let map = std::mem::take(&mut self.map);
131 let stop =
132 {
133 let cf = &mut *self.cf.write().unwrap();
134 if cf.stop { true }
135 else
136 {
137 cf.todo += 1;
138 map.to_storage(cf);
140 self.tx.send((size, map)).unwrap();
142 false
143 }
144 };
145 while stop {
146 std::thread::sleep(std::time::Duration::from_millis(100));
148 }
149 }
150
151 fn size(&self) -> u64 {
152 self.size
153 }
154
155 fn read(&self, start: u64, data: &mut [u8]) {
156 self.map.read(start, data, &*self.cf.read().unwrap());
157 }
158
159 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160 self.map.write(start, data, off, len);
161 }
162
163 fn write(&mut self, start: u64, data: &[u8]) {
164 let len = data.len();
165 let d = Arc::new(data.to_vec());
166 self.write_data(start, d, 0, len);
167 }
168
169 fn wait_complete(&self) {
170 while self.cf.read().unwrap().todo != 0 {
171 let _x = self.busy.lock();
172 }
173 }
174
175 fn shutdown(&mut self) {
176 self.cf.write().unwrap().stop = true; self.wait_complete();
178 }
179}
180
181struct CommitFile {
182 stg: ReadBufStg<256>,
184 map: WMap,
186 todo: usize,
188 stop: bool,
190}
191
192impl CommitFile {
193 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
194 Self {
195 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
196 map: WMap::default(),
197 todo: 0,
198 stop: false,
199 }
200 }
201
202 fn done_one(&mut self) {
203 self.todo -= 1;
204 if self.todo == 0 {
205 self.map = WMap::default();
206 self.stg.reset();
207 }
208 }
209}
210
211impl BasicStorage for CommitFile {
212 fn commit(&mut self, _size: u64) {
213 panic!()
214 }
215
216 fn size(&self) -> u64 {
217 panic!()
218 }
219
220 fn read(&self, start: u64, data: &mut [u8]) {
221 self.map.read(start, data, &self.stg);
222 }
223
224 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
225 self.map.write(start, data, off, len);
226 }
227
228 fn write(&mut self, _start: u64, _data: &[u8]) {
229 panic!()
230 }
231}
232
233pub trait BasicStorage: Send {
237 fn size(&self) -> u64;
240
241 fn read(&self, start: u64, data: &mut [u8]);
243
244 fn write(&mut self, start: u64, data: &[u8]);
246
247 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
249 let len = data.len();
250 let d = Arc::new(data);
251 self.write_data(start, d, 0, len);
252 }
253
254 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
256 self.write(start, &data[off..off + len]);
257 }
258
259 fn commit(&mut self, size: u64);
261
262 fn write_u64(&mut self, start: u64, value: u64) {
264 self.write(start, &value.to_le_bytes());
265 }
266
267 fn read_u64(&self, start: u64) -> u64 {
269 let mut bytes = [0; 8];
270 self.read(start, &mut bytes);
271 u64::from_le_bytes(bytes)
272 }
273
274 fn wait_complete(&self){}
276
277 fn shutdown(&mut self){}
279}
280
281pub trait Storage: BasicStorage + Sync {
283 fn clone(&self) -> Box<dyn Storage>;
285}
286
287#[derive(Default)]
289pub struct MemFile {
290 v: Arc<Mutex<Vec<u8>>>,
291}
292
293impl MemFile {
294 pub fn new() -> Box<Self> {
296 Box::default()
297 }
298}
299
300impl Storage for MemFile {
301 fn clone(&self) -> Box<dyn Storage> {
302 Box::new(Self { v: self.v.clone() })
303 }
304}
305
306impl BasicStorage for MemFile {
307 fn size(&self) -> u64 {
308 let v = self.v.lock().unwrap();
309 v.len() as u64
310 }
311
312 fn read(&self, off: u64, bytes: &mut [u8]) {
313 let off = off as usize;
314 let len = bytes.len();
315 let mut v = self.v.lock().unwrap();
316 if off + len > v.len() {
317 v.resize(off + len, 0);
318 }
319 bytes.copy_from_slice(&v[off..off + len]);
320 }
321
322 fn write(&mut self, off: u64, bytes: &[u8]) {
323 let off = off as usize;
324 let len = bytes.len();
325 let mut v = self.v.lock().unwrap();
326 if off + len > v.len() {
327 v.resize(off + len, 0);
328 }
329 v[off..off + len].copy_from_slice(bytes);
330 }
331
332 fn commit(&mut self, size: u64) {
333 let mut v = self.v.lock().unwrap();
334 v.resize(size as usize, 0);
335 }
336}
337
338use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
339
340struct FileInner {
341 f: fs::File,
342}
343
344impl FileInner {
345 pub fn new(filename: &str) -> Self {
347 Self {
348 f: OpenOptions::new()
349 .read(true)
350 .write(true)
351 .create(true)
352 .truncate(false)
353 .open(filename)
354 .unwrap(),
355 }
356 }
357
358 fn size(&mut self) -> u64 {
359 self.f.seek(SeekFrom::End(0)).unwrap()
360 }
361
362 fn read(&mut self, off: u64, bytes: &mut [u8]) {
363 self.f.seek(SeekFrom::Start(off)).unwrap();
364 let _ = self.f.read(bytes).unwrap();
365 }
366
367 fn write(&mut self, off: u64, bytes: &[u8]) {
368 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
370 {
371 let size = self.f.seek(SeekFrom::End(0)).unwrap();
372 if off > size {
373 self.f.set_len(off).unwrap();
374 }
375 }
376 self.f.seek(SeekFrom::Start(off)).unwrap();
377 let _ = self.f.write(bytes).unwrap();
378 }
379
380 fn commit(&mut self, size: u64) {
381 self.f.set_len(size).unwrap();
382 self.f.sync_all().unwrap();
383 }
384}
385
386pub struct UpdFileStorage {
388 file: Cell<Option<FileInner>>,
389}
390
391impl UpdFileStorage {
392 pub fn new(filename: &str) -> Box<Self> {
394 Box::new(Self {
395 file: Cell::new(Some(FileInner::new(filename))),
396 })
397 }
398}
399
400impl BasicStorage for UpdFileStorage {
401 fn size(&self) -> u64 {
402 let mut f = self.file.take().unwrap();
403 let result = f.size();
404 self.file.set(Some(f));
405 result
406 }
407 fn read(&self, off: u64, bytes: &mut [u8]) {
408 let mut f = self.file.take().unwrap();
409 f.read(off, bytes);
410 self.file.set(Some(f));
411 }
412
413 fn write(&mut self, off: u64, bytes: &[u8]) {
414 let mut f = self.file.take().unwrap();
415 f.write(off, bytes);
416 self.file.set(Some(f));
417 }
418
419 fn commit(&mut self, size: u64) {
420 let mut f = self.file.take().unwrap();
421 f.commit(size);
422 self.file.set(Some(f));
423 }
424}
425
426pub struct SimpleFileStorage {
428 file: Arc<Mutex<FileInner>>,
429}
430
431impl SimpleFileStorage {
432 pub fn new(filename: &str) -> Box<Self> {
434 Box::new(Self {
435 file: Arc::new(Mutex::new(FileInner::new(filename))),
436 })
437 }
438}
439
440impl Storage for SimpleFileStorage {
441 fn clone(&self) -> Box<dyn Storage> {
442 Box::new(Self {
443 file: self.file.clone(),
444 })
445 }
446}
447
448impl BasicStorage for SimpleFileStorage {
449 fn size(&self) -> u64 {
450 self.file.lock().unwrap().size()
451 }
452
453 fn read(&self, off: u64, bytes: &mut [u8]) {
454 self.file.lock().unwrap().read(off, bytes);
455 }
456
457 fn write(&mut self, off: u64, bytes: &[u8]) {
458 self.file.lock().unwrap().write(off, bytes);
459 }
460
461 fn commit(&mut self, size: u64) {
462 self.file.lock().unwrap().commit(size);
463 }
464}
465
466pub struct AnyFileStorage {
468 filename: String,
469 files: Arc<Mutex<Vec<FileInner>>>,
470}
471
472impl AnyFileStorage {
473 pub fn new(filename: &str) -> Box<Self> {
475 Box::new(Self {
476 filename: filename.to_owned(),
477 files: Arc::new(Mutex::new(Vec::new())),
478 })
479 }
480
481 fn get_file(&self) -> FileInner {
482 match self.files.lock().unwrap().pop() {
483 Some(f) => f,
484 _ => FileInner::new(&self.filename),
485 }
486 }
487
488 fn put_file(&self, f: FileInner) {
489 self.files.lock().unwrap().push(f);
490 }
491}
492
493impl Storage for AnyFileStorage {
494 fn clone(&self) -> Box<dyn Storage> {
495 Box::new(Self {
496 filename: self.filename.clone(),
497 files: self.files.clone(),
498 })
499 }
500}
501
502impl BasicStorage for AnyFileStorage {
503 fn size(&self) -> u64 {
504 let mut f = self.get_file();
505 let result = f.size();
506 self.put_file(f);
507 result
508 }
509
510 fn read(&self, off: u64, bytes: &mut [u8]) {
511 let mut f = self.get_file();
512 f.read(off, bytes);
513 self.put_file(f);
514 }
515
516 fn write(&mut self, off: u64, bytes: &[u8]) {
517 let mut f = self.get_file();
518 f.write(off, bytes);
519 self.put_file(f);
520 }
521
522 fn commit(&mut self, size: u64) {
523 let mut f = self.get_file();
524 f.commit(size);
525 self.put_file(f);
526 }
527}
528
529pub struct DummyFile {}
531impl DummyFile {
532 pub fn new() -> Box<Self> {
534 Box::new(Self {})
535 }
536}
537
538impl Storage for DummyFile {
539 fn clone(&self) -> Box<dyn Storage> {
540 Self::new()
541 }
542}
543
544impl BasicStorage for DummyFile {
545 fn size(&self) -> u64 {
546 0
547 }
548
549 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
550
551 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
552
553 fn commit(&mut self, _size: u64) {}
554}
555
556#[non_exhaustive]
558pub struct Limits {
559 pub map_lim: usize,
561 pub rbuf_mem: usize,
563 pub swbuf: usize,
565 pub uwbuf: usize,
567}
568
569impl Default for Limits {
570 fn default() -> Self {
571 Self {
572 map_lim: 5000,
573 rbuf_mem: 0x200000,
574 swbuf: 0x100000,
575 uwbuf: 0x100000,
576 }
577 }
578}
579
580struct WriteBuffer {
582 ix: usize,
584 pos: u64,
586 pub stg: Box<dyn BasicStorage>,
588 buf: Vec<u8>,
590}
591
592impl WriteBuffer {
593 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
595 Self {
596 ix: 0,
597 pos: u64::MAX,
598 stg,
599 buf: vec![0; buf_size],
600 }
601 }
602
603 pub fn write(&mut self, off: u64, data: &[u8]) {
605 if self.pos + self.ix as u64 != off {
606 self.flush(off);
607 }
608 let mut done: usize = 0;
609 let mut todo: usize = data.len();
610 while todo > 0 {
611 let mut n: usize = self.buf.len() - self.ix;
612 if n == 0 {
613 self.flush(off + done as u64);
614 n = self.buf.len();
615 }
616 if n > todo {
617 n = todo;
618 }
619 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
620 todo -= n;
621 done += n;
622 self.ix += n;
623 }
624 }
625
626 fn flush(&mut self, new_pos: u64) {
627 if self.ix > 0 {
628 self.stg.write(self.pos, &self.buf[0..self.ix]);
629 }
630 self.ix = 0;
631 self.pos = new_pos;
632 }
633
634 pub fn commit(&mut self, size: u64) {
636 self.flush(u64::MAX);
637 self.stg.commit(size);
638 }
639
640 pub fn write_u64(&mut self, start: u64, value: u64) {
642 self.write(start, &value.to_le_bytes());
643 }
644}
645
646struct ReadBufStg<const N: usize> {
652 stg: Box<dyn Storage>,
654 buf: Mutex<ReadBuffer<N>>,
656 limit: usize,
658}
659
660impl<const N: usize> Drop for ReadBufStg<N> {
661 fn drop(&mut self) {
662 self.reset();
663 }
664}
665
666impl<const N: usize> ReadBufStg<N> {
667 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
669 Self {
670 stg,
671 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
672 limit,
673 }
674 }
675
676 fn reset(&mut self) {
678 self.buf.lock().unwrap().reset();
679 }
680}
681
682impl<const N: usize> BasicStorage for ReadBufStg<N> {
683 fn read(&self, start: u64, data: &mut [u8]) {
685 if data.len() <= self.limit {
686 self.buf.lock().unwrap().read(&*self.stg, start, data);
687 } else {
688 self.stg.read(start, data);
689 }
690 }
691
692 fn size(&self) -> u64 {
694 panic!()
695 }
696
697 fn write(&mut self, _start: u64, _data: &[u8]) {
699 panic!();
700 }
701
702 fn commit(&mut self, _size: u64) {
704 panic!();
705 }
706}
707
708struct ReadBuffer<const N: usize> {
709 map: HashMap<u64, Box<[u8; N]>>,
711 max_buf: usize,
713}
714
715impl<const N: usize> ReadBuffer<N> {
716 fn new(max_buf: usize) -> Self {
717 Self {
718 map: HashMap::default(),
719 max_buf,
720 }
721 }
722
723 fn reset(&mut self) {
724 self.map.clear();
725 }
726
727 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
728 let mut done = 0;
729 while done < data.len() {
730 let off = off + done as u64;
731 let sector = off / N as u64;
732 let disp = (off % N as u64) as usize;
733 let amount = min(data.len() - done, N - disp);
734
735 let p = self.map.entry(sector).or_insert_with(|| {
736 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
737 stg.read(sector * N as u64, &mut *p);
738 p
739 });
740 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
741 done += amount;
742 }
743 if self.map.len() >= self.max_buf {
744 self.reset();
745 }
746 }
747}
748
749#[derive(Default)]
750struct DataSlice {
752 pub data: Data,
754 pub off: usize,
756 pub len: usize,
758}
759
760impl DataSlice {
761 pub fn all(&self) -> &[u8] {
763 &self.data[self.off..self.off + self.len]
764 }
765 pub fn part(&self, off: usize, len: usize) -> &[u8] {
767 &self.data[self.off + off..self.off + off + len]
768 }
769 pub fn trim(&mut self, trim: usize) {
771 self.off += trim;
772 self.len -= trim;
773 }
774 #[allow(dead_code)]
776 pub fn take(&mut self) -> Data {
777 std::mem::take(&mut self.data)
778 }
779}
780
781#[derive(Default)]
782struct WMap {
784 map: BTreeMap<u64, DataSlice>,
786}
787
788impl WMap {
789 pub fn is_empty(&self) -> bool {
791 self.map.is_empty()
792 }
793
794 pub fn len(&self) -> usize {
796 self.map.len()
797 }
798
799 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
801 let map = std::mem::take(&mut self.map);
802 let mut result = GVec::with_capacity(map.len());
803 for (end, v) in map {
804 let start = end - v.len as u64;
805 result.push((start, v));
806 }
807 result
808 }
809
810 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
812 for (end, v) in self.map.iter() {
813 let start = end - v.len as u64;
814 stg.write_data(start, v.data.clone(), v.off, v.len);
815 }
816 }
817
818 #[cfg(not(feature = "pstd"))]
819 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
821 if len != 0 {
822 let (mut insert, mut remove) = (Vec::new(), Vec::new());
823 let end = start + len as u64;
824 for (ee, v) in self.map.range_mut(start + 1..) {
825 let ee = *ee;
826 let es = ee - v.len as u64; if es >= end {
828 break;
830 } else if start <= es {
831 if end < ee {
832 v.trim((end - es) as usize);
834 break;
835 }
836 remove.push(ee);
838 } else if end < ee {
839 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
842 v.trim((end - es) as usize);
843 break;
844 } else {
845 insert.push((es, v.take(), v.off, (start - es) as usize));
848 remove.push(ee);
849 }
850 }
851 for end in remove {
852 self.map.remove(&end);
853 }
854 for (start, data, off, len) in insert {
855 self.map
856 .insert(start + len as u64, DataSlice { data, off, len });
857 }
858 self.map
859 .insert(start + len as u64, DataSlice { data, off, len });
860 }
861 }
862
863 #[cfg(feature = "pstd")]
864 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
866 if len != 0 {
867 let end = start + len as u64;
868 let mut c = self
869 .map
870 .lower_bound_mut(std::ops::Bound::Excluded(&start))
871 .with_mutable_key();
872 while let Some((eend, v)) = c.next() {
873 let ee = *eend;
874 let es = ee - v.len as u64; if es >= end {
876 c.prev();
878 break;
879 } else if start <= es {
880 if end < ee {
881 v.trim((end - es) as usize);
883 c.prev();
884 break;
885 }
886 c.remove_prev();
888 } else if end < ee {
889 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
892 v.trim((end - es) as usize);
893 c.prev();
894 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
895 break;
896 } else {
897 v.len = (start - es) as usize;
900 *eend = es + v.len as u64;
901 }
902 }
903 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
905 }
906 }
907
908 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
910 let len = data.len();
911 if len != 0 {
912 let mut done = 0;
913 for (&end, v) in self.map.range(start + 1..) {
914 let es = end - v.len as u64; let doff = start + done as u64;
916 if es > doff {
917 let a = min(len - done, (es - doff) as usize);
919 u.read(doff, &mut data[done..done + a]);
920 done += a;
921 if done == len {
922 return;
923 }
924 }
925 let skip = (start + done as u64 - es) as usize;
927 let a = min(len - done, v.len - skip);
928 data[done..done + a].copy_from_slice(v.part(skip, a));
929 done += a;
930 if done == len {
931 return;
932 }
933 }
934 u.read(start + done as u64, &mut data[done..]);
935 }
936 }
937}
938
939pub struct BasicAtomicFile {
941 stg: WriteBuffer,
943 upd: WriteBuffer,
945 map: WMap,
947 list: GVec<(u64, DataSlice)>,
949 size: u64,
950 stop: bool,
951}
952
953impl BasicAtomicFile {
954 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
956 let size = stg.size();
957 let mut result = Box::new(Self {
958 stg: WriteBuffer::new(stg, lim.swbuf),
959 upd: WriteBuffer::new(upd, lim.uwbuf),
960 map: WMap::default(),
961 list: GVec::new(),
962 size,
963 stop: false,
964 });
965 result.init();
966 result
967 }
968
969 fn init(&mut self) {
971 let end = self.upd.stg.read_u64(0);
972 let size = self.upd.stg.read_u64(8);
973 if end == 0 {
974 return;
975 }
976 assert!(end == self.upd.stg.size());
977 let mut pos = 16;
978 while pos < end {
979 let start = self.upd.stg.read_u64(pos);
980 pos += 8;
981 let len = self.upd.stg.read_u64(pos);
982 pos += 8;
983 let mut buf: GVec<u8> = gvec![0; len as usize];
984 self.upd.stg.read(pos, &mut buf);
985 pos += len;
986 self.stg.write(start, &buf);
987 }
988 self.stg.commit(size);
989 self.upd.commit(0);
990 }
991
992 pub fn commit_phase(&mut self, size: u64, phase: u8) {
994 if self.map.is_empty() && self.list.is_empty() {
995 return;
996 }
997 if phase == 1 {
998 self.list = self.map.convert_to_vec();
999
1000 self.upd.write_u64(0, 0);
1003 self.upd.write_u64(8, size);
1004 self.upd.commit(16); let mut stg_written = false;
1008 let mut pos: u64 = 16;
1009 for (start, v) in self.list.iter() {
1010 let (start, len, data) = (*start, v.len as u64, v.all());
1011 if start >= self.size {
1012 stg_written = true;
1014 self.stg.write(start, data);
1015 } else {
1016 self.upd.write_u64(pos, start);
1017 pos += 8;
1018 self.upd.write_u64(pos, len);
1019 pos += 8;
1020 self.upd.write(pos, data);
1021 pos += len;
1022 }
1023 }
1024 if stg_written {
1025 self.stg.commit(size);
1026 }
1027 self.upd.commit(pos); self.upd.write_u64(0, pos);
1031 self.upd.write_u64(8, size);
1032 self.upd.commit(pos);
1033 } else {
1034 for (start, v) in self.list.iter() {
1035 if *start < self.size {
1036 self.stg.write(*start, v.all());
1038 }
1039 }
1040 self.list = GVec::new();
1041 self.stg.commit(size);
1042 self.upd.commit(0);
1043 }
1044 }
1045}
1046
1047impl BasicStorage for BasicAtomicFile {
1048 fn commit(&mut self, size: u64) {
1049 if self.stop { return; }
1050 self.commit_phase(size, 1);
1051 self.commit_phase(size, 2);
1052 self.size = size;
1053 }
1054
1055 fn size(&self) -> u64 {
1056 self.size
1057 }
1058
1059 fn read(&self, start: u64, data: &mut [u8]) {
1060 self.map.read(start, data, &*self.stg.stg);
1061 }
1062
1063 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1064 self.map.write(start, data, off, len);
1065 }
1066
1067 fn write(&mut self, start: u64, data: &[u8]) {
1068 let len = data.len();
1069 let d = Arc::new(data.to_vec());
1070 self.write_data(start, d, 0, len);
1071 }
1072
1073 fn shutdown(&mut self)
1074 {
1075 self.stop = true;
1076 }
1077}
1078
1079#[cfg(target_family = "unix")]
1081pub struct UnixFileStorage {
1082 size: Arc<Mutex<u64>>,
1083 f: fs::File,
1084}
1085#[cfg(target_family = "unix")]
1086impl UnixFileStorage {
1087 pub fn new(filename: &str) -> Box<Self> {
1089 let mut f = OpenOptions::new()
1090 .read(true)
1091 .write(true)
1092 .create(true)
1093 .truncate(false)
1094 .open(filename)
1095 .unwrap();
1096 let size = f.seek(SeekFrom::End(0)).unwrap();
1097 let size = Arc::new(Mutex::new(size));
1098 Box::new(Self { size, f })
1099 }
1100}
1101
1102#[cfg(target_family = "unix")]
1103impl Storage for UnixFileStorage {
1104 fn clone(&self) -> Box<dyn Storage> {
1105 Box::new(Self {
1106 size: self.size.clone(),
1107 f: self.f.try_clone().unwrap(),
1108 })
1109 }
1110}
1111
1112#[cfg(target_family = "unix")]
1113use std::os::unix::fs::FileExt;
1114
1115#[cfg(target_family = "unix")]
1116impl BasicStorage for UnixFileStorage {
1117 fn read(&self, start: u64, data: &mut [u8]) {
1118 let _ = self.f.read_at(data, start);
1119 }
1120
1121 fn write(&mut self, start: u64, data: &[u8]) {
1122 let _ = self.f.write_at(data, start);
1123 }
1124
1125 fn size(&self) -> u64 {
1126 *self.size.lock().unwrap()
1127 }
1128
1129 fn commit(&mut self, size: u64) {
1130 *self.size.lock().unwrap() = size;
1131 self.f.set_len(size).unwrap();
1132 self.f.sync_all().unwrap();
1133 }
1134}
1135
1136#[cfg(target_family = "windows")]
1138pub struct WindowsFileStorage {
1139 size: Arc<Mutex<u64>>,
1140 f: fs::File,
1141}
1142#[cfg(target_family = "windows")]
1143impl WindowsFileStorage {
1144 pub fn new(filename: &str) -> Box<Self> {
1146 let mut f = OpenOptions::new()
1147 .read(true)
1148 .write(true)
1149 .create(true)
1150 .truncate(false)
1151 .open(filename)
1152 .unwrap();
1153 let size = f.seek(SeekFrom::End(0)).unwrap();
1154 let size = Arc::new(Mutex::new(size));
1155 Box::new(Self { size, f })
1156 }
1157}
1158
1159#[cfg(target_family = "windows")]
1160impl Storage for WindowsFileStorage {
1161 fn clone(&self) -> Box<dyn Storage> {
1162 Box::new(Self {
1163 size: self.size.clone(),
1164 f: self.f.try_clone().unwrap(),
1165 })
1166 }
1167}
1168
1169#[cfg(target_family = "windows")]
1170use std::os::windows::fs::FileExt;
1171
1172#[cfg(target_family = "windows")]
1173impl BasicStorage for WindowsFileStorage {
1174 fn read(&self, start: u64, data: &mut [u8]) {
1175 let _ = self.f.seek_read(data, start);
1176 }
1177
1178 fn write(&mut self, start: u64, data: &[u8]) {
1179 let _ = self.f.seek_write(data, start);
1180 }
1181
1182 fn size(&self) -> u64 {
1183 *self.size.lock().unwrap()
1184 }
1185
1186 fn commit(&mut self, size: u64) {
1187 *self.size.lock().unwrap() = size;
1188 self.f.set_len(size).unwrap();
1189 self.f.sync_all().unwrap();
1190 }
1191}
1192
1193#[cfg(target_family = "windows")]
1195pub type MultiFileStorage = WindowsFileStorage;
1196
1197#[cfg(target_family = "unix")]
1199pub type MultiFileStorage = UnixFileStorage;
1200
1201#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1203pub type MultiFileStorage = AnyFileStorage;
1204
1205#[cfg(any(target_family = "windows", target_family = "unix"))]
1207pub type FastFileStorage = MultiFileStorage;
1208
1209#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1211pub type FastFileStorage = UpdFileStorage;
1212
1213#[cfg(test)]
1214fn test_amount() -> usize {
1216 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1217}
1218
1219#[test]
1220fn test_atomic_file() {
1221 use rand::Rng;
1222 let ta = test_amount();
1225 println!(" Test amount={}", ta);
1226
1227 let mut rng = rand::thread_rng();
1228
1229 for _ in 0..100 {
1230 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1231 let mut s2 = MemFile::new();
1233
1234 for _ in 0..1000 * ta {
1235 let off: usize = rng.r#gen::<usize>() % 100;
1236 let mut len = 1 + rng.r#gen::<usize>() % 20;
1237 let w: bool = rng.r#gen();
1238 if w {
1239 let mut bytes = Vec::new();
1240 while len > 0 {
1241 len -= 1;
1242 let b: u8 = rng.r#gen::<u8>();
1243 bytes.push(b);
1244 }
1245 s1.write(off as u64, &bytes);
1246 s2.write(off as u64, &bytes);
1247 } else {
1248 let mut b2 = vec![0; len];
1249 let mut b3 = vec![0; len];
1250 s1.read(off as u64, &mut b2);
1251 s2.read(off as u64, &mut b3);
1252 assert!(b2 == b3);
1253 }
1254 if rng.r#gen::<usize>() % 50 == 0 {
1255 s1.commit(200);
1256 s2.commit(200);
1257 }
1258 }
1259 }
1260}