1use rustc_hash::FxHashMap as HashMap;
6use std::cmp::min;
7use std::sync::{Arc, Mutex, RwLock};
8
9#[cfg(feature = "pstd")]
10use pstd::collections::BTreeMap;
11#[cfg(not(feature = "pstd"))]
12use std::collections::BTreeMap;
13
14pub type Data = Arc<Vec<u8>>;
16
17pub struct AtomicFile {
34 map: WMap,
36 cf: Arc<RwLock<CommitFile>>,
38 size: u64,
40 tx: std::sync::mpsc::Sender<(u64, WMap)>,
42 busy: Arc<Mutex<()>>,
44 map_lim: usize,
46}
47
48impl AtomicFile {
49 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
51 Self::new_with_limits(stg, upd, &Limits::default())
52 }
53
54 pub fn new_with_limits(
56 stg: Box<dyn Storage>,
57 upd: Box<dyn Storage>,
58 lim: &Limits,
59 ) -> Box<Self> {
60 let size = stg.size();
61 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
62 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
63 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
64 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
68 std::thread::spawn(move || {
69 while let Ok((size, map)) = rx.recv() {
71 let _lock = busy1.lock();
72 baf.map = map;
73 baf.commit(size);
74 cf1.write().unwrap().done_one();
75 }
76 });
77 Box::new(Self {
78 map: WMap::default(),
79 cf,
80 size,
81 tx,
82 busy,
83 map_lim: lim.map_lim,
84 })
85 }
86}
87
88impl Storage for AtomicFile {
89 fn commit(&mut self, size: u64) {
90 self.size = size;
91 if self.map.is_empty() {
92 return;
93 }
94 if self.cf.read().unwrap().map.len() > self.map_lim {
95 self.wait_complete();
96 }
97 let map = std::mem::take(&mut self.map);
98 let cf = &mut *self.cf.write().unwrap();
99 cf.todo += 1;
100 map.to_storage(cf);
102 self.tx.send((size, map)).unwrap();
104 }
105
106 fn size(&self) -> u64 {
107 self.size
108 }
109
110 fn read(&self, start: u64, data: &mut [u8]) {
111 self.map.read(start, data, &*self.cf.read().unwrap());
112 }
113
114 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
115 self.map.write(start, data, off, len);
116 }
117
118 fn write(&mut self, start: u64, data: &[u8]) {
119 let len = data.len();
120 let d = Arc::new(data.to_vec());
121 self.write_data(start, d, 0, len);
122 }
123
124 fn wait_complete(&self) {
125 while self.cf.read().unwrap().todo != 0 {
126 let _x = self.busy.lock();
127 }
128 }
129}
130
131struct CommitFile {
132 stg: ReadBufStg<256>,
134 map: WMap,
136 todo: usize,
138}
139
140impl CommitFile {
141 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
142 Self {
143 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
144 map: WMap::default(),
145 todo: 0,
146 }
147 }
148
149 fn done_one(&mut self) {
150 self.todo -= 1;
151 if self.todo == 0 {
152 self.map = WMap::default();
153 self.stg.reset();
154 }
155 }
156}
157
158impl Storage for CommitFile {
159 fn commit(&mut self, _size: u64) {
160 panic!()
161 }
162
163 fn size(&self) -> u64 {
164 panic!()
165 }
166
167 fn read(&self, start: u64, data: &mut [u8]) {
168 self.map.read(start, data, &self.stg);
169 }
170
171 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
172 self.map.write(start, data, off, len);
173 }
174
175 fn write(&mut self, _start: u64, _data: &[u8]) {
176 panic!()
177 }
178}
179
180pub trait Storage: Send + Sync {
184 fn size(&self) -> u64;
187
188 fn read(&self, start: u64, data: &mut [u8]);
190
191 fn write(&mut self, start: u64, data: &[u8]);
193
194 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
196 let len = data.len();
197 let d = Arc::new(data);
198 self.write_data(start, d, 0, len);
199 }
200
201 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
203 self.write(start, &data[off..off + len]);
204 }
205
206 fn commit(&mut self, size: u64);
208
209 fn write_u64(&mut self, start: u64, value: u64) {
211 self.write(start, &value.to_le_bytes());
212 }
213
214 fn read_u64(&self, start: u64) -> u64 {
216 let mut bytes = [0; 8];
217 self.read(start, &mut bytes);
218 u64::from_le_bytes(bytes)
219 }
220
221 fn clone(&self) -> Box<dyn Storage> {
223 panic!()
224 }
225
226 fn wait_complete(&self) {}
228}
229
230#[derive(Default)]
232pub struct MemFile {
233 v: Arc<Mutex<Vec<u8>>>,
234}
235
236impl MemFile {
237 pub fn new() -> Box<Self> {
239 Box::default()
240 }
241}
242
243impl Storage for MemFile {
244 fn size(&self) -> u64 {
245 let v = self.v.lock().unwrap();
246 v.len() as u64
247 }
248
249 fn read(&self, off: u64, bytes: &mut [u8]) {
250 let off = off as usize;
251 let len = bytes.len();
252 let mut v = self.v.lock().unwrap();
253 if off + len > v.len() {
254 v.resize(off + len, 0);
255 }
256 bytes.copy_from_slice(&v[off..off + len]);
257 }
258
259 fn write(&mut self, off: u64, bytes: &[u8]) {
260 let off = off as usize;
261 let len = bytes.len();
262 let mut v = self.v.lock().unwrap();
263 if off + len > v.len() {
264 v.resize(off + len, 0);
265 }
266 v[off..off + len].copy_from_slice(bytes);
267 }
268
269 fn commit(&mut self, size: u64) {
270 let mut v = self.v.lock().unwrap();
271 v.resize(size as usize, 0);
272 }
273
274 fn clone(&self) -> Box<dyn Storage> {
275 Box::new(Self { v: self.v.clone() })
276 }
277}
278
279use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
280
281pub struct SimpleFileStorage {
283 file: Arc<Mutex<fs::File>>,
284}
285
286impl SimpleFileStorage {
287 pub fn new(filename: &str) -> Box<Self> {
289 Box::new(Self {
290 file: Arc::new(Mutex::new(
291 OpenOptions::new()
292 .read(true)
293 .write(true)
294 .create(true)
295 .truncate(false)
296 .open(filename)
297 .unwrap(),
298 )),
299 })
300 }
301}
302
303impl Storage for SimpleFileStorage {
304 fn size(&self) -> u64 {
305 let mut f = self.file.lock().unwrap();
306 f.seek(SeekFrom::End(0)).unwrap()
307 }
308
309 fn read(&self, off: u64, bytes: &mut [u8]) {
310 let mut f = self.file.lock().unwrap();
311 f.seek(SeekFrom::Start(off)).unwrap();
312 let _ = f.read(bytes).unwrap();
313 }
314
315 fn write(&mut self, off: u64, bytes: &[u8]) {
316 let mut f = self.file.lock().unwrap();
317 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
319 {
320 let size = f.seek(SeekFrom::End(0)).unwrap();
321 if off > size {
322 f.set_len(off).unwrap();
323 }
324 }
325 f.seek(SeekFrom::Start(off)).unwrap();
326 let _ = f.write(bytes).unwrap();
327 }
328
329 fn commit(&mut self, size: u64) {
330 let f = self.file.lock().unwrap();
331 f.set_len(size).unwrap();
332 f.sync_all().unwrap();
333 }
334
335 fn clone(&self) -> Box<dyn Storage> {
336 Box::new(Self {
337 file: self.file.clone(),
338 })
339 }
340}
341
342#[allow(clippy::vec_box)]
344pub struct MultiFileStorage {
345 filename: String,
346 files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
347}
348
349impl MultiFileStorage {
350 pub fn new(filename: &str) -> Box<Self> {
352 Box::new(Self {
353 filename: filename.to_owned(),
354 files: Arc::new(Mutex::new(Vec::new())),
355 })
356 }
357
358 fn get_file(&self) -> Box<SimpleFileStorage> {
359 match self.files.lock().unwrap().pop() {
360 Some(f) => f,
361 _ => SimpleFileStorage::new(&self.filename),
362 }
363 }
364
365 fn put_file(&self, f: Box<SimpleFileStorage>) {
366 self.files.lock().unwrap().push(f);
367 }
368}
369
370impl Storage for MultiFileStorage {
371 fn size(&self) -> u64 {
372 let f = self.get_file();
373 let result = f.size();
374 self.put_file(f);
375 result
376 }
377
378 fn read(&self, off: u64, bytes: &mut [u8]) {
379 let f = self.get_file();
380 f.read(off, bytes);
381 self.put_file(f);
382 }
383
384 fn write(&mut self, off: u64, bytes: &[u8]) {
385 let mut f = self.get_file();
386 f.write(off, bytes);
387 self.put_file(f);
388 }
389
390 fn commit(&mut self, size: u64) {
391 let mut f = self.get_file();
392 f.commit(size);
393 self.put_file(f);
394 }
395
396 fn clone(&self) -> Box<dyn Storage> {
397 Box::new(Self {
398 filename: self.filename.clone(),
399 files: self.files.clone(),
400 })
401 }
402}
403
404pub struct DummyFile {}
406impl DummyFile {
407 pub fn new() -> Box<Self> {
409 Box::new(Self {})
410 }
411}
412
413impl Storage for DummyFile {
414 fn size(&self) -> u64 {
415 0
416 }
417
418 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
419
420 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
421
422 fn commit(&mut self, _size: u64) {}
423
424 fn clone(&self) -> Box<dyn Storage> {
425 Self::new()
426 }
427}
428
429#[non_exhaustive]
431pub struct Limits {
432 pub map_lim: usize,
434 pub rbuf_mem: usize,
436 pub swbuf: usize,
438 pub uwbuf: usize,
440}
441
442impl Default for Limits {
443 fn default() -> Self {
444 Self {
445 map_lim: 5000,
446 rbuf_mem: 0x200000,
447 swbuf: 0x100000,
448 uwbuf: 0x100000,
449 }
450 }
451}
452
453struct WriteBuffer {
455 ix: usize,
457 pos: u64,
459 pub stg: Box<dyn Storage>,
461 buf: Vec<u8>,
463}
464
465impl WriteBuffer {
466 pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
468 Self {
469 ix: 0,
470 pos: u64::MAX,
471 stg,
472 buf: vec![0; buf_size],
473 }
474 }
475
476 pub fn write(&mut self, off: u64, data: &[u8]) {
478 if self.pos + self.ix as u64 != off {
479 self.flush(off);
480 }
481 let mut done: usize = 0;
482 let mut todo: usize = data.len();
483 while todo > 0 {
484 let mut n: usize = self.buf.len() - self.ix;
485 if n == 0 {
486 self.flush(off + done as u64);
487 n = self.buf.len();
488 }
489 if n > todo {
490 n = todo;
491 }
492 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
493 todo -= n;
494 done += n;
495 self.ix += n;
496 }
497 }
498
499 fn flush(&mut self, new_pos: u64) {
500 if self.ix > 0 {
501 self.stg.write(self.pos, &self.buf[0..self.ix]);
502 }
503 self.ix = 0;
504 self.pos = new_pos;
505 }
506
507 pub fn commit(&mut self, size: u64) {
509 self.flush(u64::MAX);
510 self.stg.commit(size);
511 }
512
513 pub fn write_u64(&mut self, start: u64, value: u64) {
515 self.write(start, &value.to_le_bytes());
516 }
517}
518
519struct ReadBufStg<const N: usize> {
525 stg: Box<dyn Storage>,
527 buf: Mutex<ReadBuffer<N>>,
529 limit: usize,
531}
532
533impl<const N: usize> Drop for ReadBufStg<N> {
534 fn drop(&mut self) {
535 self.reset();
536 }
537}
538
539impl<const N: usize> ReadBufStg<N> {
540 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
542 Self {
543 stg,
544 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
545 limit,
546 }
547 }
548
549 fn reset(&mut self) {
551 self.buf.lock().unwrap().reset();
552 }
553}
554
555impl<const N: usize> Storage for ReadBufStg<N> {
556 fn read(&self, start: u64, data: &mut [u8]) {
558 if data.len() <= self.limit {
559 self.buf.lock().unwrap().read(&*self.stg, start, data);
560 } else {
561 self.stg.read(start, data);
562 }
563 }
564
565 fn size(&self) -> u64 {
567 panic!()
568 }
569
570 fn write(&mut self, _start: u64, _data: &[u8]) {
572 panic!();
573 }
574
575 fn commit(&mut self, _size: u64) {
577 panic!();
578 }
579}
580
581struct ReadBuffer<const N: usize> {
582 map: HashMap<u64, Box<[u8; N]>>,
584 max_buf: usize,
586}
587
588impl<const N: usize> ReadBuffer<N> {
589 fn new(max_buf: usize) -> Self {
590 Self {
591 map: HashMap::default(),
592 max_buf,
593 }
594 }
595
596 fn reset(&mut self) {
597 self.map.clear();
598 }
599
600 fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
601 let mut done = 0;
602 while done < data.len() {
603 let off = off + done as u64;
604 let sector = off / N as u64;
605 let disp = (off % N as u64) as usize;
606 let amount = min(data.len() - done, N - disp);
607
608 let p = self.map.entry(sector).or_insert_with(|| {
609 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
610 stg.read(sector * N as u64, &mut *p);
611 p
612 });
613 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
614 done += amount;
615 }
616 if self.map.len() >= self.max_buf {
617 self.reset();
618 }
619 }
620}
621
622#[derive(Default)]
623struct DataSlice {
625 pub data: Data,
627 pub off: usize,
629 pub len: usize,
631}
632
633impl DataSlice {
634 pub fn all(&self) -> &[u8] {
636 &self.data[self.off..self.off + self.len]
637 }
638 pub fn part(&self, off: usize, len: usize) -> &[u8] {
640 &self.data[self.off + off..self.off + off + len]
641 }
642 pub fn trim(&mut self, trim: usize) {
644 self.off += trim;
645 self.len -= trim;
646 }
647 #[allow(dead_code)]
649 pub fn take(&mut self) -> Data {
650 std::mem::take(&mut self.data)
651 }
652}
653
654#[derive(Default)]
655struct WMap {
657 map: BTreeMap<u64, DataSlice>,
659}
660
661impl WMap {
662 pub fn is_empty(&self) -> bool {
664 self.map.is_empty()
665 }
666
667 pub fn len(&self) -> usize {
669 self.map.len()
670 }
671
672 pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
674 let map = std::mem::take(&mut self.map);
675 let mut result = Vec::with_capacity(map.len());
676 for (end, v) in map {
677 let start = end - v.len as u64;
678 result.push((start, v));
679 }
680 result
681 }
682
683 pub fn to_storage(&self, stg: &mut dyn Storage) {
685 for (end, v) in self.map.iter() {
686 let start = end - v.len as u64;
687 stg.write_data(start, v.data.clone(), v.off, v.len);
688 }
689 }
690
691 #[cfg(not(feature = "pstd"))]
692 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
694 if len != 0 {
695 let (mut insert, mut remove) = (Vec::new(), Vec::new());
696 let end = start + len as u64;
697 for (ee, v) in self.map.range_mut(start + 1..) {
698 let ee = *ee;
699 let es = ee - v.len as u64; if es >= end {
701 break;
703 } else if start <= es {
704 if end < ee {
705 v.trim((end - es) as usize);
707 break;
708 }
709 remove.push(ee);
711 } else if end < ee {
712 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
715 v.trim((end - es) as usize);
716 break;
717 } else {
718 insert.push((es, v.take(), v.off, (start - es) as usize));
721 remove.push(ee);
722 }
723 }
724 for end in remove {
725 self.map.remove(&end);
726 }
727 for (start, data, off, len) in insert {
728 self.map
729 .insert(start + len as u64, DataSlice { data, off, len });
730 }
731 self.map
732 .insert(start + len as u64, DataSlice { data, off, len });
733 }
734 }
735
736 #[cfg(feature = "pstd")]
737 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
739 if len != 0 {
740 let end = start + len as u64;
741 let mut c = self
742 .map
743 .lower_bound_mut(std::ops::Bound::Excluded(&start))
744 .with_mutable_key();
745 while let Some((eend, v)) = c.next() {
746 let ee = *eend;
747 let es = ee - v.len as u64; if es >= end {
749 c.prev();
751 break;
752 } else if start <= es {
753 if end < ee {
754 v.trim((end - es) as usize);
756 c.prev();
757 break;
758 }
759 c.remove_prev();
761 } else if end < ee {
762 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
765 v.trim((end - es) as usize);
766 c.prev();
767 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
768 break;
769 } else {
770 v.len = (start - es) as usize;
773 *eend = es + v.len as u64;
774 }
775 }
776 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
778 }
779 }
780
781 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
783 let len = data.len();
784 if len != 0 {
785 let mut done = 0;
786 for (&end, v) in self.map.range(start + 1..) {
787 let es = end - v.len as u64; let doff = start + done as u64;
789 if es > doff {
790 let a = min(len - done, (es - doff) as usize);
792 u.read(doff, &mut data[done..done + a]);
793 done += a;
794 if done == len {
795 return;
796 }
797 }
798 let skip = (start + done as u64 - es) as usize;
800 let a = min(len - done, v.len - skip);
801 data[done..done + a].copy_from_slice(v.part(skip, a));
802 done += a;
803 if done == len {
804 return;
805 }
806 }
807 u.read(start + done as u64, &mut data[done..]);
808 }
809 }
810}
811
812pub struct BasicAtomicFile {
814 stg: WriteBuffer,
816 upd: WriteBuffer,
818 map: WMap,
820 list: Vec<(u64, DataSlice)>,
822 size: u64,
823}
824
825impl BasicAtomicFile {
826 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
828 let size = stg.size();
829 let mut result = Box::new(Self {
830 stg: WriteBuffer::new(stg, lim.swbuf),
831 upd: WriteBuffer::new(upd, lim.uwbuf),
832 map: WMap::default(),
833 list: Vec::new(),
834 size,
835 });
836 result.init();
837 result
838 }
839
840 fn init(&mut self) {
842 let end = self.upd.stg.read_u64(0);
843 let size = self.upd.stg.read_u64(8);
844 if end == 0 {
845 return;
846 }
847 assert!(end == self.upd.stg.size());
848 let mut pos = 16;
849 while pos < end {
850 let start = self.upd.stg.read_u64(pos);
851 pos += 8;
852 let len = self.upd.stg.read_u64(pos);
853 pos += 8;
854 let mut buf = vec![0; len as usize];
855 self.upd.stg.read(pos, &mut buf);
856 pos += len;
857 self.stg.write(start, &buf);
858 }
859 self.stg.commit(size);
860 self.upd.commit(0);
861 }
862
863 pub fn commit_phase(&mut self, size: u64, phase: u8) {
865 if self.map.is_empty() && self.list.is_empty() {
866 return;
867 }
868 if phase == 1 {
869 self.list = self.map.convert_to_vec();
870
871 self.upd.write_u64(0, 0);
874 self.upd.write_u64(8, size);
875 self.upd.commit(16); let mut stg_written = false;
879 let mut pos: u64 = 16;
880 for (start, v) in self.list.iter() {
881 let (start, len, data) = (*start, v.len as u64, v.all());
882 if start >= self.size {
883 stg_written = true;
885 self.stg.write(start, data);
886 } else {
887 self.upd.write_u64(pos, start);
888 pos += 8;
889 self.upd.write_u64(pos, len);
890 pos += 8;
891 self.upd.write(pos, data);
892 pos += len;
893 }
894 }
895 if stg_written {
896 self.stg.commit(size);
897 }
898 self.upd.commit(pos); self.upd.write_u64(0, pos);
902 self.upd.write_u64(8, size);
903 self.upd.commit(pos);
904 } else {
905 for (start, v) in self.list.iter() {
906 if *start < self.size {
907 self.stg.write(*start, v.all());
909 }
910 }
911 self.list.clear();
912 self.stg.commit(size);
913 self.upd.commit(0);
914 }
915 }
916}
917
918impl Storage for BasicAtomicFile {
919 fn commit(&mut self, size: u64) {
920 self.commit_phase(size, 1);
921 self.commit_phase(size, 2);
922 self.size = size;
923 }
924
925 fn size(&self) -> u64 {
926 self.size
927 }
928
929 fn read(&self, start: u64, data: &mut [u8]) {
930 self.map.read(start, data, &*self.stg.stg);
931 }
932
933 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
934 self.map.write(start, data, off, len);
935 }
936
937 fn write(&mut self, start: u64, data: &[u8]) {
938 let len = data.len();
939 let d = Arc::new(data.to_vec());
940 self.write_data(start, d, 0, len);
941 }
942}
943
944#[cfg(test)]
945fn test_amount() -> usize {
947 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
948}
949
950#[test]
951fn test_atomic_file() {
952 use rand::Rng;
953 let ta = test_amount();
956 println!(" Test amount={}", ta);
957
958 let mut rng = rand::thread_rng();
959
960 for _ in 0..100 {
961 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new() );
962 let mut s2 = MemFile::new();
964
965 for _ in 0..1000 * ta {
966 let off: usize = rng.r#gen::<usize>() % 100;
967 let mut len = 1 + rng.r#gen::<usize>() % 20;
968 let w: bool = rng.r#gen();
969 if w {
970 let mut bytes = Vec::new();
971 while len > 0 {
972 len -= 1;
973 let b: u8 = rng.r#gen::<u8>();
974 bytes.push(b);
975 }
976 s1.write(off as u64, &bytes);
977 s2.write(off as u64, &bytes);
978 } else {
979 let mut b2 = vec![0; len];
980 let mut b3 = vec![0; len];
981 s1.read(off as u64, &mut b2);
982 s2.read(off as u64, &mut b3);
983 assert!(b2 == b3);
984 }
985 if rng.r#gen::<usize>() % 50 == 0 {
986 s1.commit(200);
987 s2.commit(200);
988 }
989 }
990 }
991}