1use rustc_hash::FxHashMap as HashMap;
6use std::cmp::min;
7use std::sync::{Arc, Mutex, RwLock};
8
9#[cfg(feature = "pstd")]
10use pstd::collections::BTreeMap;
11#[cfg(not(feature = "pstd"))]
12use std::collections::BTreeMap;
13
14pub type Data = Arc<Vec<u8>>;
16
17pub struct AtomicFile {
34 map: WMap,
36 cf: Arc<RwLock<CommitFile>>,
38 size: u64,
40 tx: std::sync::mpsc::Sender<(u64, WMap)>,
42 busy: Arc<Mutex<()>>,
44 map_lim: usize,
46}
47
48impl AtomicFile {
49 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
51 Self::new_with_limits(stg, upd, &Limits::default())
52 }
53
54 pub fn new_with_limits(
56 stg: Box<dyn Storage>,
57 upd: Box<dyn Storage>,
58 lim: &Limits,
59 ) -> Box<Self> {
60 let size = stg.size();
61 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
62 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
63 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
64 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
68 std::thread::spawn(move || {
69 while let Ok((size, map)) = rx.recv() {
71 let _lock = busy1.lock();
72 baf.map = map;
73 baf.commit(size);
74 cf1.write().unwrap().done_one();
75 }
76 });
77 Box::new(Self {
78 map: WMap::default(),
79 cf,
80 size,
81 tx,
82 busy,
83 map_lim: lim.map_lim,
84 })
85 }
86}
87
88impl Storage for AtomicFile {
89 fn commit(&mut self, size: u64) {
90 self.size = size;
91 if self.map.is_empty() {
92 return;
93 }
94 if self.cf.read().unwrap().map.len() > self.map_lim {
95 self.wait_complete();
96 }
97 let map = std::mem::take(&mut self.map);
98 let cf = &mut *self.cf.write().unwrap();
99 cf.todo += 1;
100 map.to_storage(cf);
102 self.tx.send((size, map)).unwrap();
104 }
105
106 fn size(&self) -> u64 {
107 self.size
108 }
109
110 fn read(&self, start: u64, data: &mut [u8]) {
111 self.map.read(start, data, &*self.cf.read().unwrap());
112 }
113
114 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
115 self.map.write(start, data, off, len);
116 }
117
118 fn write(&mut self, start: u64, data: &[u8]) {
119 let len = data.len();
120 let d = Arc::new(data.to_vec());
121 self.write_data(start, d, 0, len);
122 }
123
124 fn wait_complete(&self) {
125 while self.cf.read().unwrap().todo != 0 {
126 let _x = self.busy.lock();
127 }
128 }
129}
130
131pub trait Storage: Send + Sync {
135 fn size(&self) -> u64;
138
139 fn read(&self, start: u64, data: &mut [u8]);
141
142 fn write(&mut self, start: u64, data: &[u8]);
144
145 fn write_vec(&mut self, start: u64, data: Vec<u8>) {
147 let len = data.len();
148 let d = Arc::new(data);
149 self.write_data(start, d, 0, len);
150 }
151
152 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
154 self.write(start, &data[off..off + len]);
155 }
156
157 fn commit(&mut self, size: u64);
159
160 fn write_u64(&mut self, start: u64, value: u64) {
162 self.write(start, &value.to_le_bytes());
163 }
164
165 fn read_u64(&self, start: u64) -> u64 {
167 let mut bytes = [0; 8];
168 self.read(start, &mut bytes);
169 u64::from_le_bytes(bytes)
170 }
171
172 fn clone(&self) -> Box<dyn Storage> {
174 panic!()
175 }
176
177 fn wait_complete(&self) {}
179}
180
181#[derive(Default)]
183pub struct MemFile {
184 v: Arc<Mutex<Vec<u8>>>,
185}
186
187impl MemFile {
188 pub fn new() -> Box<Self> {
190 Box::<Self>::default()
191 }
192}
193
194impl Storage for MemFile {
195 fn size(&self) -> u64 {
196 let v = self.v.lock().unwrap();
197 v.len() as u64
198 }
199
200 fn read(&self, off: u64, bytes: &mut [u8]) {
201 let off = off as usize;
202 let len = bytes.len();
203 let mut v = self.v.lock().unwrap();
204 if off + len > v.len() {
205 v.resize(off + len, 0);
206 }
207 bytes.copy_from_slice(&v[off..off + len]);
208 }
209
210 fn write(&mut self, off: u64, bytes: &[u8]) {
211 let off = off as usize;
212 let len = bytes.len();
213 let mut v = self.v.lock().unwrap();
214 if off + len > v.len() {
215 v.resize(off + len, 0);
216 }
217 v[off..off + len].copy_from_slice(bytes);
218 }
219
220 fn commit(&mut self, size: u64) {
221 let mut v = self.v.lock().unwrap();
222 v.resize(size as usize, 0);
223 }
224
225 fn clone(&self) -> Box<dyn Storage> {
226 Box::new(Self { v: self.v.clone() })
227 }
228}
229
230use std::{fs, fs::OpenOptions, io::Read, io::Seek, io::SeekFrom, io::Write};
231
232pub struct SimpleFileStorage {
234 file: Arc<Mutex<fs::File>>,
235}
236
237impl SimpleFileStorage {
238 pub fn new(filename: &str) -> Box<Self> {
240 Box::new(Self {
241 file: Arc::new(Mutex::new(
242 OpenOptions::new()
243 .read(true)
244 .write(true)
245 .create(true)
246 .truncate(false)
247 .open(filename)
248 .unwrap(),
249 )),
250 })
251 }
252}
253
254impl Storage for SimpleFileStorage {
255 fn size(&self) -> u64 {
256 let mut f = self.file.lock().unwrap();
257 f.seek(SeekFrom::End(0)).unwrap()
258 }
259
260 fn read(&self, off: u64, bytes: &mut [u8]) {
261 let mut f = self.file.lock().unwrap();
262 f.seek(SeekFrom::Start(off)).unwrap();
263 let _ = f.read(bytes).unwrap();
264 }
265
266 fn write(&mut self, off: u64, bytes: &[u8]) {
267 let mut f = self.file.lock().unwrap();
268 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
270 {
271 let size = f.seek(SeekFrom::End(0)).unwrap();
272 if off > size {
273 f.set_len(off).unwrap();
274 }
281 }
282 f.seek(SeekFrom::Start(off)).unwrap();
283 let _ = f.write(bytes).unwrap();
284 }
285
286 fn commit(&mut self, size: u64) {
287 let f = self.file.lock().unwrap();
288 f.set_len(size).unwrap();
289 f.sync_all().unwrap();
290 }
291
292 fn clone(&self) -> Box<dyn Storage> {
293 Box::new(Self {
294 file: self.file.clone(),
295 })
296 }
297}
298
299#[allow(clippy::vec_box)]
301pub struct MultiFileStorage {
302 filename: String,
303 files: Arc<Mutex<Vec<Box<SimpleFileStorage>>>>,
304}
305
306impl MultiFileStorage {
307 pub fn new(filename: &str) -> Box<Self> {
309 Box::new(Self {
310 filename: filename.to_string(),
311 files: Arc::new(Mutex::new(Vec::new())),
312 })
313 }
314
315 fn get_file(&self) -> Box<SimpleFileStorage> {
316 match self.files.lock().unwrap().pop() {
317 Some(f) => f,
318 _ => SimpleFileStorage::new(&self.filename),
319 }
320 }
321
322 fn put_file(&self, f: Box<SimpleFileStorage>) {
323 self.files.lock().unwrap().push(f);
324 }
325}
326
327impl Storage for MultiFileStorage {
328 fn size(&self) -> u64 {
329 let f = self.get_file();
330 let result = f.size();
331 self.put_file(f);
332 result
333 }
334
335 fn read(&self, off: u64, bytes: &mut [u8]) {
336 let f = self.get_file();
337 f.read(off, bytes);
338 self.put_file(f);
339 }
340
341 fn write(&mut self, off: u64, bytes: &[u8]) {
342 let mut f = self.get_file();
343 f.write(off, bytes);
344 self.put_file(f);
345 }
346
347 fn commit(&mut self, size: u64) {
348 let mut f = self.get_file();
349 f.commit(size);
350 self.put_file(f);
351 }
352
353 fn clone(&self) -> Box<dyn Storage> {
354 Box::new(Self {
355 filename: self.filename.clone(),
356 files: self.files.clone(),
357 })
358 }
359}
360
361pub struct DummyFile {}
363impl DummyFile {
364 pub fn new() -> Box<Self> {
366 Box::new(Self {})
367 }
368}
369
370impl Storage for DummyFile {
371 fn size(&self) -> u64 {
372 0
373 }
374
375 fn read(&self, _off: u64, _bytes: &mut [u8]) {}
376
377 fn write(&mut self, _off: u64, _bytes: &[u8]) {}
378
379 fn commit(&mut self, _size: u64) {}
380
381 fn clone(&self) -> Box<dyn Storage> {
382 Self::new()
383 }
384}
385
386#[non_exhaustive]
388pub struct Limits {
389 pub map_lim: usize,
391 pub rbuf_mem: usize,
393 pub swbuf: usize,
395 pub uwbuf: usize,
397}
398
399impl Default for Limits {
400 fn default() -> Self {
401 Self {
402 map_lim: 5000,
403 rbuf_mem: 0x200000,
404 swbuf: 0x100000,
405 uwbuf: 0x100000,
406 }
407 }
408}
409
410struct CommitFile {
411 stg: ReadBufStg<256>,
413 map: WMap,
415 todo: usize,
417}
418
419impl CommitFile {
420 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
421 Self {
422 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
423 map: WMap::default(),
424 todo: 0,
425 }
426 }
427
428 fn done_one(&mut self) {
429 self.todo -= 1;
430 if self.todo == 0 {
431 self.map = WMap::default();
432 self.stg.reset();
433 }
434 }
435}
436
437impl Storage for CommitFile {
438 fn commit(&mut self, _size: u64) {
439 panic!()
440 }
441
442 fn size(&self) -> u64 {
443 panic!()
444 }
445
446 fn read(&self, start: u64, data: &mut [u8]) {
447 self.map.read(start, data, &self.stg);
448 }
449
450 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
451 self.map.write(start, data, off, len);
452 }
453
454 fn write(&mut self, _start: u64, _data: &[u8]) {
455 panic!()
456 }
457}
458
459struct WriteBuffer {
461 ix: usize,
463 pos: u64,
465 pub stg: Box<dyn Storage>,
467 buf: Vec<u8>,
469}
470
471impl WriteBuffer {
472 pub fn new(stg: Box<dyn Storage>, buf_size: usize) -> Self {
474 Self {
475 ix: 0,
476 pos: u64::MAX,
477 stg,
478 buf: vec![0; buf_size],
479 }
480 }
481
482 pub fn write(&mut self, off: u64, data: &[u8]) {
484 if self.pos + self.ix as u64 != off {
485 self.flush(off);
486 }
487 let mut done: usize = 0;
488 let mut todo: usize = data.len();
489 while todo > 0 {
490 let mut n: usize = self.buf.len() - self.ix;
491 if n == 0 {
492 self.flush(off + done as u64);
493 n = self.buf.len();
494 }
495 if n > todo {
496 n = todo;
497 }
498 self.buf[self.ix..self.ix + n].copy_from_slice(&data[done..done + n]);
499 todo -= n;
500 done += n;
501 self.ix += n;
502 }
503 }
504
505 fn flush(&mut self, new_pos: u64) {
506 if self.ix > 0 {
507 self.stg.write(self.pos, &self.buf[0..self.ix]);
508 }
509 self.ix = 0;
510 self.pos = new_pos;
511 }
512
513 pub fn commit(&mut self, size: u64) {
515 self.flush(u64::MAX);
516 self.stg.commit(size);
517 }
518
519 pub fn write_u64(&mut self, start: u64, value: u64) {
521 self.write(start, &value.to_le_bytes());
522 }
523}
524
525struct ReadBufStg<const N: usize> {
531 stg: Box<dyn Storage>,
533 buf: Mutex<ReadBuffer<N>>,
535 limit: usize,
537}
538
539impl<const N: usize> Drop for ReadBufStg<N> {
540 fn drop(&mut self) {
541 self.reset();
542 }
543}
544
545impl<const N: usize> ReadBufStg<N> {
546 pub fn new(stg: Box<dyn Storage>, limit: usize, max_buf: usize) -> Self {
548 Self {
549 stg,
550 buf: Mutex::new(ReadBuffer::<N>::new(max_buf)),
551 limit,
552 }
553 }
554
555 fn reset(&mut self) {
557 self.buf.lock().unwrap().reset();
558 }
559}
560
561impl<const N: usize> Storage for ReadBufStg<N> {
562 fn read(&self, start: u64, data: &mut [u8]) {
564 if data.len() <= self.limit {
565 self.buf.lock().unwrap().read(&*self.stg, start, data);
566 } else {
567 self.stg.read(start, data);
568 }
569 }
570
571 fn size(&self) -> u64 {
573 panic!()
574 }
575
576 fn write(&mut self, _start: u64, _data: &[u8]) {
578 panic!();
579 }
580
581 fn commit(&mut self, _size: u64) {
583 panic!();
584 }
585}
586
587struct ReadBuffer<const N: usize> {
588 map: HashMap<u64, Box<[u8; N]>>,
590 max_buf: usize,
592}
593
594impl<const N: usize> ReadBuffer<N> {
595 fn new(max_buf: usize) -> Self {
596 Self {
597 map: HashMap::default(),
598 max_buf,
599 }
600 }
601
602 fn reset(&mut self) {
603 self.map.clear();
604 }
605
606 fn read(&mut self, stg: &dyn Storage, off: u64, data: &mut [u8]) {
607 let mut done = 0;
608 while done < data.len() {
609 let off = off + done as u64;
610 let sector = off / N as u64;
611 let disp = (off % N as u64) as usize;
612 let amount = min(data.len() - done, N - disp);
613
614 let p = self.map.entry(sector).or_insert_with(|| {
615 let mut p: Box<[u8; N]> = vec![0; N].try_into().unwrap();
616 stg.read(sector * N as u64, &mut *p);
617 p
618 });
619 data[done..done + amount].copy_from_slice(&p[disp..disp + amount]);
620 done += amount;
621 }
622 if self.map.len() >= self.max_buf {
623 self.reset();
624 }
625 }
626}
627
628#[derive(Default)]
629struct DataSlice {
631 pub data: Data,
633 pub off: usize,
635 pub len: usize,
637}
638
639impl DataSlice {
640 pub fn all(&self) -> &[u8] {
642 &self.data[self.off..self.off + self.len]
643 }
644 pub fn part(&self, off: usize, len: usize) -> &[u8] {
646 &self.data[self.off + off..self.off + off + len]
647 }
648 pub fn trim(&mut self, trim: usize) {
650 self.off += trim;
651 self.len -= trim;
652 }
653 #[allow(dead_code)]
655 pub fn take(&mut self) -> Data {
656 std::mem::take(&mut self.data)
657 }
658}
659
660#[derive(Default)]
661struct WMap {
663 map: BTreeMap<u64, DataSlice>,
665}
666
667impl WMap {
668 pub fn is_empty(&self) -> bool {
670 self.map.is_empty()
671 }
672
673 pub fn len(&self) -> usize {
675 self.map.len()
676 }
677
678 pub fn convert_to_vec(&mut self) -> Vec<(u64, DataSlice)> {
680 let map = std::mem::take(&mut self.map);
681 let mut result = Vec::with_capacity(map.len());
682 for (end, v) in map {
683 let start = end - v.len as u64;
684 result.push((start, v));
685 }
686 result
687 }
688
689 pub fn to_storage(&self, stg: &mut dyn Storage) {
691 for (end, v) in self.map.iter() {
692 let start = end - v.len as u64;
693 stg.write_data(start, v.data.clone(), v.off, v.len);
694 }
695 }
696
697 #[cfg(not(feature = "pstd"))]
698 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
700 if len != 0 {
701 let (mut insert, mut remove) = (Vec::new(), Vec::new());
702 let end = start + len as u64;
703 for (ee, v) in self.map.range_mut(start + 1..) {
704 let ee = *ee;
705 let es = ee - v.len as u64; if es >= end {
707 break;
709 } else if start <= es {
710 if end < ee {
711 v.trim((end - es) as usize);
713 break;
714 }
715 remove.push(ee);
717 } else if end < ee {
718 insert.push((es, v.data.clone(), v.off, (start - es) as usize));
721 v.trim((end - es) as usize);
722 break;
723 } else {
724 insert.push((es, v.take(), v.off, (start - es) as usize));
727 remove.push(ee);
728 }
729 }
730 for end in remove {
731 self.map.remove(&end);
732 }
733 for (start, data, off, len) in insert {
734 self.map
735 .insert(start + len as u64, DataSlice { data, off, len });
736 }
737 self.map
738 .insert(start + len as u64, DataSlice { data, off, len });
739 }
740 }
741
742 #[cfg(feature = "pstd")]
743 pub fn write(&mut self, start: u64, data: Data, off: usize, len: usize) {
745 if len != 0 {
746 let end = start + len as u64;
747 let mut c = self
748 .map
749 .lower_bound_mut(std::ops::Bound::Excluded(&start))
750 .with_mutable_key();
751 while let Some((eend, v)) = c.next() {
752 let ee = *eend;
753 let es = ee - v.len as u64; if es >= end {
755 c.prev();
757 break;
758 } else if start <= es {
759 if end < ee {
760 v.trim((end - es) as usize);
762 c.prev();
763 break;
764 }
765 c.remove_prev();
767 } else if end < ee {
768 let (data, off, len) = (v.data.clone(), v.off, (start - es) as usize);
771 v.trim((end - es) as usize);
772 c.prev();
773 c.insert_before_unchecked(es + len as u64, DataSlice { data, off, len });
774 break;
775 } else {
776 v.len = (start - es) as usize;
779 *eend = es + v.len as u64;
780 }
781 }
782 c.insert_after_unchecked(start + len as u64, DataSlice { data, off, len });
784 }
785 }
786
787 pub fn read(&self, start: u64, data: &mut [u8], u: &dyn Storage) {
789 let len = data.len();
790 if len != 0 {
791 let mut done = 0;
792 for (&end, v) in self.map.range(start + 1..) {
793 let es = end - v.len as u64; let doff = start + done as u64;
795 if es > doff {
796 let a = min(len - done, (es - doff) as usize);
798 u.read(doff, &mut data[done..done + a]);
799 done += a;
800 if done == len {
801 return;
802 }
803 }
804 let skip = (start + done as u64 - es) as usize;
806 let a = min(len - done, v.len - skip);
807 data[done..done + a].copy_from_slice(v.part(skip, a));
808 done += a;
809 if done == len {
810 return;
811 }
812 }
813 u.read(start + done as u64, &mut data[done..]);
814 }
815 }
816}
817
818pub struct BasicAtomicFile {
820 stg: WriteBuffer,
822 upd: WriteBuffer,
824 map: WMap,
826 list: Vec<(u64, DataSlice)>,
828 size: u64,
829}
830
831impl BasicAtomicFile {
832 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
834 let size = stg.size();
835 let mut result = Box::new(Self {
836 stg: WriteBuffer::new(stg, lim.swbuf),
837 upd: WriteBuffer::new(upd, lim.uwbuf),
838 map: WMap::default(),
839 list: Vec::new(),
840 size,
841 });
842 result.init();
843 result
844 }
845
846 fn init(&mut self) {
848 let end = self.upd.stg.read_u64(0);
849 let size = self.upd.stg.read_u64(8);
850 if end == 0 {
851 return;
852 }
853 assert!(end == self.upd.stg.size());
854 let mut pos = 16;
855 while pos < end {
856 let start = self.upd.stg.read_u64(pos);
857 pos += 8;
858 let len = self.upd.stg.read_u64(pos);
859 pos += 8;
860 let mut buf = vec![0; len as usize];
861 self.upd.stg.read(pos, &mut buf);
862 pos += len;
863 self.stg.write(start, &buf);
864 }
865 self.stg.commit(size);
866 self.upd.commit(0);
867 }
868
869 pub fn commit_phase(&mut self, size: u64, phase: u8) {
871 if self.map.is_empty() && self.list.is_empty() {
872 return;
873 }
874 if phase == 1 {
875 self.list = self.map.convert_to_vec();
876
877 self.upd.write_u64(0, 0);
880 self.upd.write_u64(8, size);
881 self.upd.commit(16); let mut stg_written = false;
885 let mut pos: u64 = 16;
886 for (start, v) in self.list.iter() {
887 let (start, len, data) = (*start, v.len as u64, v.all());
888 if start >= self.size {
889 stg_written = true;
891 self.stg.write(start, data);
892 } else {
893 self.upd.write_u64(pos, start);
894 pos += 8;
895 self.upd.write_u64(pos, len);
896 pos += 8;
897 self.upd.write(pos, data);
898 pos += len;
899 }
900 }
901 if stg_written {
902 self.stg.commit(size);
903 }
904 self.upd.commit(pos); self.upd.write_u64(0, pos);
908 self.upd.write_u64(8, size);
909 self.upd.commit(pos);
910 } else {
911 for (start, v) in self.list.iter() {
912 if *start < self.size {
913 self.stg.write(*start, v.all());
915 }
916 }
917 self.list.clear();
918 self.stg.commit(size);
919 self.upd.commit(0);
920 }
921 }
922}
923
924impl Storage for BasicAtomicFile {
925 fn commit(&mut self, size: u64) {
926 self.commit_phase(size, 1);
927 self.commit_phase(size, 2);
928 self.size = size;
929 }
930
931 fn size(&self) -> u64 {
932 self.size
933 }
934
935 fn read(&self, start: u64, data: &mut [u8]) {
936 self.map.read(start, data, &*self.stg.stg);
937 }
938
939 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
940 self.map.write(start, data, off, len);
941 }
942
943 fn write(&mut self, start: u64, data: &[u8]) {
944 let len = data.len();
945 let d = Arc::new(data.to_vec());
946 self.write_data(start, d, 0, len);
947 }
948}
949
950#[cfg(test)]
951fn test_amount() -> usize {
953 str::parse(&std::env::var("TA").unwrap_or("1".to_string())).unwrap()
954}
955
956#[test]
957fn test_atomic_file() {
958 use rand::Rng;
959 let ta = test_amount();
962 println!(" Test amount={}", ta);
963
964 let mut rng = rand::thread_rng();
965
966 for _ in 0..100 {
967 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
968 let mut s2 = MemFile::new();
969
970 for _ in 0..1000 * ta {
971 let off: usize = rng.r#gen::<usize>() % 100;
972 let mut len = 1 + rng.r#gen::<usize>() % 20;
973 let w: bool = rng.r#gen();
974 if w {
975 let mut bytes = Vec::new();
976 while len > 0 {
977 len -= 1;
978 let b: u8 = rng.r#gen::<u8>();
979 bytes.push(b);
980 }
981 s1.write(off as u64, &bytes);
982 s2.write(off as u64, &bytes);
983 } else {
984 let mut b2 = vec![0; len];
985 let mut b3 = vec![0; len];
986 s1.read(off as u64, &mut b2);
987 s2.read(off as u64, &mut b3);
988 assert!(b2 == b3);
989 }
990 }
991 s1.commit(200);
992 s2.commit(200);
993 }
994}