1#![deny(missing_docs)]
16
17use rustc_hash::FxHashMap as HashMap;
18use std::cell::Cell;
19use std::cmp::min;
20use std::sync::{Arc, Mutex, RwLock};
21
22#[cfg(feature = "pstd")]
23use pstd::{
24 VecA,
25 collections::{BTreeMapA, btree_map::CustomTuning},
26 localalloc::GTemp,
27 veca as gvec,
28};
29
30#[cfg(not(feature = "pstd"))]
31use std::{collections::BTreeMap, vec as gvec, vec::Vec as GVec};
32
33#[cfg(feature = "pstd")]
34type BTreeMap<K, V> = BTreeMapA<K, V, CustomTuning<GTemp>>;
35
36#[cfg(feature = "pstd")]
37type GVec<T> = VecA<T, GTemp>;
38
39pub type Data = Arc<Vec<u8>>;
41
42pub struct AtomicFile {
59 map: WMap,
61 cf: Arc<RwLock<CommitFile>>,
63 size: u64,
65 tx: std::sync::mpsc::Sender<(u64, WMap)>,
67 busy: Arc<Mutex<()>>,
69 map_lim: usize,
71}
72
73impl AtomicFile {
74 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn BasicStorage>) -> Box<Self> {
76 Self::new_with_limits(stg, upd, &Limits::default())
77 }
78
79 pub fn new_with_limits(
81 stg: Box<dyn Storage>,
82 upd: Box<dyn BasicStorage>,
83 lim: &Limits,
84 ) -> Box<Self> {
85 let size = stg.size();
86 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
87
88 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
89 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
90 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
94
95 std::thread::spawn(move || {
96 while let Ok((size, map)) = rx.recv() {
98 let _lock = busy1.lock();
99 baf.map = map;
100 baf.commit(size);
101 cf1.write().unwrap().done_one();
102 }
103 });
104 Box::new(Self {
105 map: WMap::default(),
106 cf,
107 size,
108 tx,
109 busy,
110 map_lim: lim.map_lim,
111 })
112 }
113}
114
115impl Storage for AtomicFile {
116 fn clone(&self) -> Box<dyn Storage> {
117 panic!()
118 }
119}
120
121impl BasicStorage for AtomicFile {
122 fn commit(&mut self, size: u64) {
123 self.size = size;
124 if self.map.is_empty() {
125 return;
126 }
127 if self.cf.read().unwrap().map.len() > self.map_lim {
128 self.wait_complete();
129 }
130 let map = std::mem::take(&mut self.map);
131 let stop =
132 {
133 let cf = &mut *self.cf.write().unwrap();
134 if cf.stop { true }
135 else
136 {
137 cf.todo += 1;
138 map.to_storage(cf);
140 self.tx.send((size, map)).unwrap();
142 false
143 }
144 };
145 while stop {
146 std::thread::sleep(std::time::Duration::from_millis(100));
148 }
149 }
150
151 fn size(&self) -> u64 {
152 self.size
153 }
154
155 fn read(&self, start: u64, data: &mut [u8]) {
156 self.map.read(start, data, &*self.cf.read().unwrap());
157 }
158
159 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
160 self.map.write(start, data, off, len);
161 }
162
163 fn write(&mut self, start: u64, data: &[u8]) {
164 let len = data.len();
165 let d = Arc::new(data.to_vec());
166 self.write_data(start, d, 0, len);
167 }
168
169 fn wait_complete(&self) {
170 while self.cf.read().unwrap().busy()
171 {
172 let _x = self.busy.lock();
173 }
174 }
175
176 fn shutdown(&mut self) {
177 self.cf.write().unwrap().stop = true; while self.cf.read().unwrap().todo != 0 {
179 let _x = self.busy.lock();
180 }
181 }
182}
183
184struct CommitFile {
185 stg: ReadBufStg<256>,
187 map: WMap,
189 todo: usize,
191 stop: bool,
193}
194
195impl CommitFile {
196 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
197 Self {
198 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
199 map: WMap::default(),
200 todo: 0,
201 stop: false,
202 }
203 }
204
205 fn done_one(&mut self) {
206 self.todo -= 1;
207 if self.todo == 0 {
208 self.map = WMap::default();
209 self.stg.reset();
210 }
211 }
212
213 fn busy(&self) -> bool
214 {
215 self.todo != 0 || self.stop
216 }
217}
218
219impl BasicStorage for CommitFile {
220 fn commit(&mut self, _size: u64) {
221 panic!()
222 }
223
224 fn size(&self) -> u64 {
225 panic!()
226 }
227
228 fn read(&self, start: u64, data: &mut [u8]) {
229 self.map.read(start, data, &self.stg);
230 }
231
232 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
233 self.map.write(start, data, off, len);
234 }
235
236 fn write(&mut self, _start: u64, _data: &[u8]) {
237 panic!()
238 }
239}
240
241pub trait BasicStorage: Send {
245 fn size(&self) -> u64;
248
249 fn read(&self, start: u64, data: &mut [u8]);
251
252 fn write(&mut self, start: u64, data: &[u8]);
254
255 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
257 let len = data.len();
258 let d = Arc::new(data);
259 self.write_data(start, d, 0, len);
260 }
261
262 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
264 self.write(start, &data[off..off + len]);
265 }
266
267 fn commit(&mut self, size: u64);
269
270 fn write_u64(&mut self, start: u64, value: u64) {
272 self.write(start, &value.to_le_bytes());
273 }
274
275 fn read_u64(&self, start: u64) -> u64 {
277 let mut bytes = [0; 8];
278 self.read(start, &mut bytes);
279 u64::from_le_bytes(bytes)
280 }
281
282 fn wait_complete(&self){}
284
285 fn shutdown(&mut self){}
287}
288
289pub trait Storage: BasicStorage + Sync {
291 fn clone(&self) -> Box<dyn Storage>;
293}
294
295#[derive(Default)]
297pub struct MemFile {
298 v: Arc<Mutex<Vec<u8>>>,
299}
300
301impl MemFile {
302 pub fn new() -> Box<Self> {
304 Box::default()
305 }
306}
307
308impl Storage for MemFile {
309 fn clone(&self) -> Box<dyn Storage> {
310 Box::new(Self { v: self.v.clone() })
311 }
312}
313
314impl BasicStorage for MemFile {
315 fn size(&self) -> u64 {
316 let v = self.v.lock().unwrap();
317 v.len() as u64
318 }
319
320 fn read(&self, off: u64, bytes: &mut [u8]) {
321 let off = off as usize;
322 let len = bytes.len();
323 let mut v = self.v.lock().unwrap();
324 if off + len > v.len() {
325 v.resize(off + len, 0);
326 }
327 bytes.copy_from_slice(&v[off..off + len]);
328 }
329
330 fn write(&mut self, off: u64, bytes: &[u8]) {
331 let off = off as usize;
332 let len = bytes.len();
333 let mut v = self.v.lock().unwrap();
334 if off + len > v.len() {
335 v.resize(off + len, 0);
336 }
337 v[off..off + len].copy_from_slice(bytes);
338 }
339
340 fn commit(&mut self, size: u64) {
341 let mut v = self.v.lock().unwrap();
342 v.resize(size as usize, 0);
343 }
344}
345
346use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
347
348struct FileInner {
349 f: fs::File,
350}
351
352impl FileInner {
353 pub fn new(filename: &str) -> Self {
355 Self {
356 f: OpenOptions::new()
357 .read(true)
358 .write(true)
359 .create(true)
360 .truncate(false)
361 .open(filename)
362 .unwrap(),
363 }
364 }
365
366 fn size(&mut self) -> u64 {
367 self.f.seek(SeekFrom::End(0)).unwrap()
368 }
369
370 fn read(&mut self, off: u64, bytes: &mut [u8]) {
371 self.f.seek(SeekFrom::Start(off)).unwrap();
372 let _ = self.f.read(bytes).unwrap();
373 }
374
375 fn write(&mut self, off: u64, bytes: &[u8]) {
376 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
378 {
379 let size = self.f.seek(SeekFrom::End(0)).unwrap();
380 if off > size {
381 self.f.set_len(off).unwrap();
382 }
383 }
384 self.f.seek(SeekFrom::Start(off)).unwrap();
385 let _ = self.f.write(bytes).unwrap();
386 }
387
388 fn commit(&mut self, size: u64) {
389 self.f.set_len(size).unwrap();
390 self.f.sync_all().unwrap();
391 }
392}
393
394pub struct UpdFileStorage {
396 file: Cell<Option<FileInner>>,
397}
398
399impl UpdFileStorage {
400 pub fn new(filename: &str) -> Box<Self> {
402 Box::new(Self {
403 file: Cell::new(Some(FileInner::new(filename))),
404 })
405 }
406}
407
408impl BasicStorage for UpdFileStorage {
409 fn size(&self) -> u64 {
410 let mut f = self.file.take().unwrap();
411 let result = f.size();
412 self.file.set(Some(f));
413 result
414 }
415 fn read(&self, off: u64, bytes: &mut [u8]) {
416 let mut f = self.file.take().unwrap();
417 f.read(off, bytes);
418 self.file.set(Some(f));
419 }
420
421 fn write(&mut self, off: u64, bytes: &[u8]) {
422 let mut f = self.file.take().unwrap();
423 f.write(off, bytes);
424 self.file.set(Some(f));
425 }
426
427 fn commit(&mut self, size: u64) {
428 let mut f = self.file.take().unwrap();
429 f.commit(size);
430 self.file.set(Some(f));
431 }
432}
433
434pub struct SimpleFileStorage {
436 file: Arc<Mutex<FileInner>>,
437}
438
439impl SimpleFileStorage {
440 pub fn new(filename: &str) -> Box<Self> {
442 Box::new(Self {
443 file: Arc::new(Mutex::new(FileInner::new(filename))),
444 })
445 }
446}
447
448impl Storage for SimpleFileStorage {
449 fn clone(&self) -> Box<dyn Storage> {
450 Box::new(Self {
451 file: self.file.clone(),
452 })
453 }
454}
455
456impl BasicStorage for SimpleFileStorage {
457 fn size(&self) -> u64 {
458 self.file.lock().unwrap().size()
459 }
460
461 fn read(&self, off: u64, bytes: &mut [u8]) {
462 self.file.lock().unwrap().read(off, bytes);
463 }
464
465 fn write(&mut self, off: u64, bytes: &[u8]) {
466 self.file.lock().unwrap().write(off, bytes);
467 }
468
469 fn commit(&mut self, size: u64) {
470 self.file.lock().unwrap().commit(size);
471 }
472}
473
474pub struct AnyFileStorage {
476 filename: String,
477 files: Arc<Mutex<Vec<FileInner>>>,
478}
479
480impl AnyFileStorage {
481 pub fn new(filename: &str) -> Box<Self> {
483 Box::new(Self {
484 filename: filename.to_owned(),
485 files: Arc::new(Mutex::new(Vec::new())),
486 })
487 }
488
489 fn get_file(&self) -> FileInner {
490 match self.files.lock().unwrap().pop() {
491 Some(f) => f,
492 _ => FileInner::new(&self.filename),
493 }
494 }
495
496 fn put_file(&self, f: FileInner) {
497 self.files.lock().unwrap().push(f);
498 }
499}
500
501impl Storage for AnyFileStorage {
502 fn clone(&self) -> Box<dyn Storage> {
503 Box::new(Self {
504 filename: self.filename.clone(),
505 files: self.files.clone(),
506 })
507 }
508}
509
510impl BasicStorage for AnyFileStorage {
511 fn size(&self) -> u64 {
512 let mut f = self.get_file();
513 let result = f.size();
514 self.put_file(f);
515 result
516 }
517
518 fn read(&self, off: u64, bytes: &mut [u8]) {
519 let mut f = self.get_file();
520 f.read(off, bytes);
521 self.put_file(f);
522 }
523
524 fn write(&mut self, off: u64, bytes: &[u8]) {
525 let mut f = self.get_file();
526 f.write(off, bytes);
527 self.put_file(f);
528 }
529
530 fn commit(&mut self, size: u64) {
531 let mut f = self.get_file();
532 f.commit(size);
533 self.put_file(f);
534 }
535}
536
537pub struct DummyFile {}
539impl DummyFile {
540 pub fn new() -> Box<Self> {
542 Box::new(Self {})
543 }
544}
545
546impl Storage for DummyFile {
547 fn clone(&self) -> Box<dyn Storage> {
548 Self::new()
549 }
550}
551
552impl BasicStorage for DummyFile {
553 fn size(&self) -> u64 {
554 0
555 }
556
557 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
558
559 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
560
561 fn commit(&mut self, _size: u64) {}
562}
563
564#[non_exhaustive]
566pub struct Limits {
567 pub map_lim: usize,
569 pub rbuf_mem: usize,
571 pub swbuf: usize,
573 pub uwbuf: usize,
575}
576
577impl Default for Limits {
578 fn default() -> Self {
579 Self {
580 map_lim: 5000,
581 rbuf_mem: 0x200000,
582 swbuf: 0x100000,
583 uwbuf: 0x100000,
584 }
585 }
586}
587
588struct WriteBuffer {
590 ix: usize,
592 pos: u64,
594 pub stg: Box<dyn BasicStorage>,
596 buf: Vec<u8>,
598}
599
600impl WriteBuffer {
601 pub fn new(stg: Box<dyn BasicStorage>, buf_size: usize) -> Self {
603 Self {
604 ix: 0,
605 pos: u64::MAX,
606 stg,
607 buf: vec![0; buf_size],
608 }
609 }
610
611 pub fn write(&mut self, off: u64, data: &[u8]) {
613 if self.pos + self.ix as u64 != off {
614 self.flush(off);
615 }
616 let mut done: usize = 0;
617 let mut todo: usize = data.len();
618 while todo > 0 {
619 let mut n: usize = self.buf.len() - self.ix;
620 if n == 0 {
621 self.flush(off + done as u64);
622 n = self.buf.len();
623 }
624 if n > todo {
625 n = todo;
626 }
627 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
628 todo -= n;
629 done += n;
630 self.ix += n;
631 }
632 }
633
634 fn flush(&mut self, new_pos: u64) {
635 if self.ix > 0 {
636 self.stg.write(self.pos, &self.buf[0..self.ix]);
637 }
638 self.ix = 0;
639 self.pos = new_pos;
640 }
641
642 pub fn commit(&mut self, size: u64) {
644 self.flush(u64::MAX);
645 self.stg.commit(size);
646 }
647
648 pub fn write_u64(&mut self, start: u64, value: u64) {
650 self.write(start, &value.to_le_bytes());
651 }
652}
653
654struct ReadBufStg<const N: usize> {
660 stg: Box<dyn Storage>,
662 buf: Mutex<ReadBuffer<N>>,
664 limit: usize,
666}
667
668impl<const N: usize> Drop for ReadBufStg<N> {
669 fn drop(&mut self) {
670 self.reset();
671 }
672}
673
674impl<const N: usize> ReadBufStg<N> {
675 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
677 Self {
678 stg,
679 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
680 limit,
681 }
682 }
683
684 fn reset(&mut self) {
686 self.buf.lock().unwrap().reset();
687 }
688}
689
690impl<const N: usize> BasicStorage for ReadBufStg<N> {
691 fn read(&self, start: u64, data: &mut [u8]) {
693 if data.len() <= self.limit {
694 self.buf.lock().unwrap().read(&*self.stg, start, data);
695 } else {
696 self.stg.read(start, data);
697 }
698 }
699
700 fn size(&self) -> u64 {
702 panic!()
703 }
704
705 fn write(&mut self, _start: u64, _data: &[u8]) {
707 panic!();
708 }
709
710 fn commit(&mut self, _size: u64) {
712 panic!();
713 }
714}
715
716struct ReadBuffer<const N: usize> {
717 map: HashMap<u64, Box<[u8; N]>>,
719 max_buf: usize,
721}
722
723impl<const N: usize> ReadBuffer<N> {
724 fn new(max_buf: usize) -> Self {
725 Self {
726 map: HashMap::default(),
727 max_buf,
728 }
729 }
730
731 fn reset(&mut self) {
732 self.map.clear();
733 }
734
735 fn read(&mut self, stg: &dyn BasicStorage, off: u64, data: &mut [u8]) {
736 let mut done = 0;
737 while done < data.len() {
738 let off = off + done as u64;
739 let sector = off / N as u64;
740 let disp = (off % N as u64) as usize;
741 let amount = min(data.len() - done, N - disp);
742
743 let p = self.map.entry(sector).or_insert_with(|| {
744 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
745 stg.read(sector * N as u64, &mut *p);
746 p
747 });
748 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
749 done += amount;
750 }
751 if self.map.len() >= self.max_buf {
752 self.reset();
753 }
754 }
755}
756
757#[derive(Default)]
758struct DataSlice {
760 pub data: Data,
762 pub off: usize,
764 pub len: usize,
766}
767
768impl DataSlice {
769 pub fn all(&self) -> &[u8] {
771 &self.data[self.off..self.off + self.len]
772 }
773 pub fn part(&self, off: usize, len: usize) -> &[u8] {
775 &self.data[self.off + off..self.off + off + len]
776 }
777 pub fn trim(&mut self, trim: usize) {
779 self.off += trim;
780 self.len -= trim;
781 }
782 #[allow(dead_code)]
784 pub fn take(&mut self) -> Data {
785 std::mem::take(&mut self.data)
786 }
787}
788
789#[derive(Default)]
790struct WMap {
792 map: BTreeMap<u64, DataSlice>,
794}
795
796impl WMap {
797 pub fn is_empty(&self) -> bool {
799 self.map.is_empty()
800 }
801
802 pub fn len(&self) -> usize {
804 self.map.len()
805 }
806
807 pub fn convert_to_vec(&mut self) -> GVec<(u64, DataSlice)> {
809 let map = std::mem::take(&mut self.map);
810 let mut result = GVec::with_capacity(map.len());
811 for (end, v) in map {
812 let start = end - v.len as u64;
813 result.push((start, v));
814 }
815 result
816 }
817
818 pub fn to_storage(&self, stg: &mut dyn BasicStorage) {
820 for (end, v) in self.map.iter() {
821 let start = end - v.len as u64;
822 stg.write_data(start, v.data.clone(), v.off, v.len);
823 }
824 }
825
826 #[cfg(not(feature = "pstd"))]
827 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
829 if len != 0 {
830 let (mut insert, mut remove) = (Vec::new(), Vec::new());
831 let end = start + len as u64;
832 for (ee, v) in self.map.range_mut(start + 1..) {
833 let ee = *ee;
834 let es = ee - v.len as u64; if es >= end {
836 break;
838 } else if start <= es {
839 if end < ee {
840 v.trim((end - es) as usize);
842 break;
843 }
844 remove.push(ee);
846 } else if end < ee {
847 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
850 v.trim((end - es) as usize);
851 break;
852 } else {
853 insert.push((es, v.take(), v.off, (start - es) as usize));
856 remove.push(ee);
857 }
858 }
859 for end in remove {
860 self.map.remove(&end);
861 }
862 for (start, data, off, len) in insert {
863 self.map
864 .insert(start + len as u64, DataSlice { data, off, len });
865 }
866 self.map
867 .insert(start + len as u64, DataSlice { data, off, len });
868 }
869 }
870
871 #[cfg(feature = "pstd")]
872 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
874 if len != 0 {
875 let end = start + len as u64;
876 let mut c = self
877 .map
878 .lower_bound_mut(std::ops::Bound::Excluded(&start))
879 .with_mutable_key();
880 while let Some((eend, v)) = c.next() {
881 let ee = *eend;
882 let es = ee - v.len as u64; if es >= end {
884 c.prev();
886 break;
887 } else if start <= es {
888 if end < ee {
889 v.trim((end - es) as usize);
891 c.prev();
892 break;
893 }
894 c.remove_prev();
896 } else if end < ee {
897 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
900 v.trim((end - es) as usize);
901 c.prev();
902 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
903 break;
904 } else {
905 v.len = (start - es) as usize;
908 *eend = es + v.len as u64;
909 }
910 }
911 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
913 }
914 }
915
916 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn BasicStorage) {
918 let len = data.len();
919 if len != 0 {
920 let mut done = 0;
921 for (&end, v) in self.map.range(start + 1..) {
922 let es = end - v.len as u64; let doff = start + done as u64;
924 if es > doff {
925 let a = min(len - done, (es - doff) as usize);
927 u.read(doff, &mut data[done..done + a]);
928 done += a;
929 if done == len {
930 return;
931 }
932 }
933 let skip = (start + done as u64 - es) as usize;
935 let a = min(len - done, v.len - skip);
936 data[done..done + a].copy_from_slice(v.part(skip, a));
937 done += a;
938 if done == len {
939 return;
940 }
941 }
942 u.read(start + done as u64, &mut data[done..]);
943 }
944 }
945}
946
947pub struct BasicAtomicFile {
949 stg: WriteBuffer,
951 upd: WriteBuffer,
953 map: WMap,
955 list: GVec<(u64, DataSlice)>,
957 size: u64,
958 stop: bool,
959}
960
961impl BasicAtomicFile {
962 pub fn new(stg: Box<dyn BasicStorage>, upd: Box<dyn BasicStorage>, lim: &Limits) -> Box<Self> {
964 let size = stg.size();
965 let mut result = Box::new(Self {
966 stg: WriteBuffer::new(stg, lim.swbuf),
967 upd: WriteBuffer::new(upd, lim.uwbuf),
968 map: WMap::default(),
969 list: GVec::new(),
970 size,
971 stop: false,
972 });
973 result.init();
974 result
975 }
976
977 fn init(&mut self) {
979 let end = self.upd.stg.read_u64(0);
980 let size = self.upd.stg.read_u64(8);
981 if end == 0 {
982 return;
983 }
984 assert!(end == self.upd.stg.size());
985 let mut pos = 16;
986 while pos < end {
987 let start = self.upd.stg.read_u64(pos);
988 pos += 8;
989 let len = self.upd.stg.read_u64(pos);
990 pos += 8;
991 let mut buf: GVec<u8> = gvec![0; len as usize];
992 self.upd.stg.read(pos, &mut buf);
993 pos += len;
994 self.stg.write(start, &buf);
995 }
996 self.stg.commit(size);
997 self.upd.commit(0);
998 }
999
1000 pub fn commit_phase(&mut self, size: u64, phase: u8) {
1002 if self.map.is_empty() && self.list.is_empty() {
1003 return;
1004 }
1005 if phase == 1 {
1006 self.list = self.map.convert_to_vec();
1007
1008 self.upd.write_u64(0, 0);
1011 self.upd.write_u64(8, size);
1012 self.upd.commit(16); let mut stg_written = false;
1016 let mut pos: u64 = 16;
1017 for (start, v) in self.list.iter() {
1018 let (start, len, data) = (*start, v.len as u64, v.all());
1019 if start >= self.size {
1020 stg_written = true;
1022 self.stg.write(start, data);
1023 } else {
1024 self.upd.write_u64(pos, start);
1025 pos += 8;
1026 self.upd.write_u64(pos, len);
1027 pos += 8;
1028 self.upd.write(pos, data);
1029 pos += len;
1030 }
1031 }
1032 if stg_written {
1033 self.stg.commit(size);
1034 }
1035 self.upd.commit(pos); self.upd.write_u64(0, pos);
1039 self.upd.write_u64(8, size);
1040 self.upd.commit(pos);
1041 } else {
1042 for (start, v) in self.list.iter() {
1043 if *start < self.size {
1044 self.stg.write(*start, v.all());
1046 }
1047 }
1048 self.list = GVec::new();
1049 self.stg.commit(size);
1050 self.upd.commit(0);
1051 }
1052 }
1053}
1054
1055impl BasicStorage for BasicAtomicFile {
1056 fn commit(&mut self, size: u64) {
1057 if self.stop { return; }
1058 self.commit_phase(size, 1);
1059 self.commit_phase(size, 2);
1060 self.size = size;
1061 }
1062
1063 fn size(&self) -> u64 {
1064 self.size
1065 }
1066
1067 fn read(&self, start: u64, data: &mut [u8]) {
1068 self.map.read(start, data, &*self.stg.stg);
1069 }
1070
1071 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
1072 self.map.write(start, data, off, len);
1073 }
1074
1075 fn write(&mut self, start: u64, data: &[u8]) {
1076 let len = data.len();
1077 let d = Arc::new(data.to_vec());
1078 self.write_data(start, d, 0, len);
1079 }
1080
1081 fn shutdown(&mut self)
1082 {
1083 self.stop = true;
1084 }
1085}
1086
1087#[cfg(target_family = "unix")]
1089pub struct UnixFileStorage {
1090 size: Arc<Mutex<u64>>,
1091 f: fs::File,
1092}
1093#[cfg(target_family = "unix")]
1094impl UnixFileStorage {
1095 pub fn new(filename: &str) -> Box<Self> {
1097 let mut f = OpenOptions::new()
1098 .read(true)
1099 .write(true)
1100 .create(true)
1101 .truncate(false)
1102 .open(filename)
1103 .unwrap();
1104 let size = f.seek(SeekFrom::End(0)).unwrap();
1105 let size = Arc::new(Mutex::new(size));
1106 Box::new(Self { size, f })
1107 }
1108}
1109
1110#[cfg(target_family = "unix")]
1111impl Storage for UnixFileStorage {
1112 fn clone(&self) -> Box<dyn Storage> {
1113 Box::new(Self {
1114 size: self.size.clone(),
1115 f: self.f.try_clone().unwrap(),
1116 })
1117 }
1118}
1119
1120#[cfg(target_family = "unix")]
1121use std::os::unix::fs::FileExt;
1122
1123#[cfg(target_family = "unix")]
1124impl BasicStorage for UnixFileStorage {
1125 fn read(&self, start: u64, data: &mut [u8]) {
1126 let _ = self.f.read_at(data, start);
1127 }
1128
1129 fn write(&mut self, start: u64, data: &[u8]) {
1130 let _ = self.f.write_at(data, start);
1131 }
1132
1133 fn size(&self) -> u64 {
1134 *self.size.lock().unwrap()
1135 }
1136
1137 fn commit(&mut self, size: u64) {
1138 *self.size.lock().unwrap() = size;
1139 self.f.set_len(size).unwrap();
1140 self.f.sync_all().unwrap();
1141 }
1142}
1143
1144#[cfg(target_family = "windows")]
1146pub struct WindowsFileStorage {
1147 size: Arc<Mutex<u64>>,
1148 f: fs::File,
1149}
1150#[cfg(target_family = "windows")]
1151impl WindowsFileStorage {
1152 pub fn new(filename: &str) -> Box<Self> {
1154 let mut f = OpenOptions::new()
1155 .read(true)
1156 .write(true)
1157 .create(true)
1158 .truncate(false)
1159 .open(filename)
1160 .unwrap();
1161 let size = f.seek(SeekFrom::End(0)).unwrap();
1162 let size = Arc::new(Mutex::new(size));
1163 Box::new(Self { size, f })
1164 }
1165}
1166
1167#[cfg(target_family = "windows")]
1168impl Storage for WindowsFileStorage {
1169 fn clone(&self) -> Box<dyn Storage> {
1170 Box::new(Self {
1171 size: self.size.clone(),
1172 f: self.f.try_clone().unwrap(),
1173 })
1174 }
1175}
1176
1177#[cfg(target_family = "windows")]
1178use std::os::windows::fs::FileExt;
1179
1180#[cfg(target_family = "windows")]
1181impl BasicStorage for WindowsFileStorage {
1182 fn read(&self, start: u64, data: &mut [u8]) {
1183 let _ = self.f.seek_read(data, start);
1184 }
1185
1186 fn write(&mut self, start: u64, data: &[u8]) {
1187 let _ = self.f.seek_write(data, start);
1188 }
1189
1190 fn size(&self) -> u64 {
1191 *self.size.lock().unwrap()
1192 }
1193
1194 fn commit(&mut self, size: u64) {
1195 *self.size.lock().unwrap() = size;
1196 self.f.set_len(size).unwrap();
1197 self.f.sync_all().unwrap();
1198 }
1199}
1200
1201#[cfg(target_family = "windows")]
1203pub type MultiFileStorage = WindowsFileStorage;
1204
1205#[cfg(target_family = "unix")]
1207pub type MultiFileStorage = UnixFileStorage;
1208
1209#[cfg(not(any(target_family = "unix", target_family = "windows")))]
1211pub type MultiFileStorage = AnyFileStorage;
1212
1213#[cfg(any(target_family = "windows", target_family = "unix"))]
1215pub type FastFileStorage = MultiFileStorage;
1216
1217#[cfg(not(any(target_family = "windows", target_family = "unix")))]
1219pub type FastFileStorage = UpdFileStorage;
1220
1221#[cfg(test)]
1222fn test_amount() -> usize {
1224 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
1225}
1226
1227#[test]
1228fn test_atomic_file() {
1229 use rand::Rng;
1230 let ta = test_amount();
1233 println!(" Test amount={}", ta);
1234
1235 let mut rng = rand::thread_rng();
1236
1237 for _ in 0..100 {
1238 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
1239 let mut s2 = MemFile::new();
1241
1242 for _ in 0..1000 * ta {
1243 let off: usize = rng.r#gen::<usize>() % 100;
1244 let mut len = 1 + rng.r#gen::<usize>() % 20;
1245 let w: bool = rng.r#gen();
1246 if w {
1247 let mut bytes = Vec::new();
1248 while len > 0 {
1249 len -= 1;
1250 let b: u8 = rng.r#gen::<u8>();
1251 bytes.push(b);
1252 }
1253 s1.write(off as u64, &bytes);
1254 s2.write(off as u64, &bytes);
1255 } else {
1256 let mut b2 = vec![0; len];
1257 let mut b3 = vec![0; len];
1258 s1.read(off as u64, &mut b2);
1259 s2.read(off as u64, &mut b3);
1260 assert!(b2 == b3);
1261 }
1262 if rng.r#gen::<usize>() % 50 == 0 {
1263 s1.commit(200);
1264 s2.commit(200);
1265 }
1266 }
1267 }
1268}