1use crate::{
2 buf::ReadBufStg, wmap::WMap, Arc, BasicAtomicFile, Data, Limits, Mutex, RwLock, Storage,
3};
4
5pub struct AtomicFile {
8 map: WMap,
9 cf: Arc<RwLock<CommitFile>>,
10 size: u64,
11 tx: std::sync::mpsc::Sender<(u64, WMap)>,
12 busy: Arc<Mutex<()>>,
13 map_lim: usize,
14}
15
16impl AtomicFile {
17 pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
19 Self::new_with_limits(stg, upd, &Limits::default())
20 }
21
22 pub fn new_with_limits(
24 stg: Box<dyn Storage>,
25 upd: Box<dyn Storage>,
26 lim: &Limits,
27 ) -> Box<Self> {
28 let size = stg.size();
29 let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
30 let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
31 let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
32 let busy = Arc::new(Mutex::new(())); let (cf1, busy1) = (cf.clone(), busy.clone());
36 std::thread::spawn(move || {
37 while let Ok((size, map)) = rx.recv() {
38 let _lock = busy1.lock();
39 baf.map = map;
40 baf.commit(size);
41 cf1.write().unwrap().done_one();
42 }
43 });
44 Box::new(Self {
45 map: WMap::default(),
46 cf,
47 size,
48 tx,
49 busy,
50 map_lim: lim.map_lim,
51 })
52 }
53}
54
55impl Storage for AtomicFile {
56 fn commit(&mut self, size: u64) {
57 self.size = size;
58 if self.map.is_empty() {
59 return;
60 }
61 if self.cf.read().unwrap().map.len() > self.map_lim {
62 self.wait_complete();
63 }
64 let map = std::mem::take(&mut self.map);
65 let cf = &mut *self.cf.write().unwrap();
66 cf.todo += 1;
67 map.to_storage(cf);
68 self.tx.send((size, map)).unwrap();
69 }
70
71 fn size(&self) -> u64 {
72 self.size
73 }
74
75 fn read(&self, start: u64, data: &mut [u8]) {
76 self.map.read(start, data, &*self.cf.read().unwrap());
77 }
78
79 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
80 self.map.write(start, data, off, len);
81 }
82
83 fn write(&mut self, start: u64, data: &[u8]) {
84 let len = data.len();
85 let d = Arc::new(data.to_vec());
86 self.write_data(start, d, 0, len);
87 }
88
89 fn wait_complete(&self) {
90 while self.cf.read().unwrap().todo != 0 {
91 #[cfg(feature = "log")]
92 println!("AtomicFile::wait_complete - waiting for writer process");
93 let _x = self.busy.lock();
94 }
95 }
96}
97
98struct CommitFile {
99 stg: Box<dyn Storage>,
100 map: WMap,
101 todo: usize,
102}
103
104impl CommitFile {
105 fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
106 Self {
107 stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
108 map: WMap::default(),
109 todo: 0,
110 }
111 }
112
113 fn done_one(&mut self) {
114 self.todo -= 1;
115 if self.todo == 0 {
116 self.map = WMap::default();
117 self.stg.reset();
118 }
119 }
120}
121
122impl Storage for CommitFile {
123 fn commit(&mut self, _size: u64) {
124 panic!()
125 }
126
127 fn size(&self) -> u64 {
128 panic!()
129 }
130
131 fn read(&self, start: u64, data: &mut [u8]) {
132 self.map.read(start, data, &*self.stg);
133 }
134
135 fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
136 self.map.write(start, data, off, len);
137 }
138
139 fn write(&mut self, _start: u64, _data: &[u8]) {
140 panic!()
141 }
142}
143
144#[test]
145pub fn test() {
146 use crate::stg::MemFile;
147 use rand::Rng;
148 let ta = crate::test::test_amount();
151
152 let mut rng = rand::thread_rng();
153
154 for _ in 0..1000 {
155 let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
156 let mut s2 = MemFile::new();
157
158 for _ in 0..1000 * ta {
159 let off: usize = rng.gen::<usize>() % 100;
160 let mut len = 1 + rng.gen::<usize>() % 20;
161 let w: bool = rng.gen();
162 if w {
163 let mut bytes = Vec::new();
164 while len > 0 {
165 len -= 1;
166 let b: u8 = rng.gen::<u8>();
167 bytes.push(b);
168 }
169 s1.write(off as u64, &bytes);
170 s2.write(off as u64, &bytes);
171 } else {
172 let mut b2 = vec![0; len];
173 let mut b3 = vec![0; len];
174 s1.read(off as u64, &mut b2);
175 s2.read(off as u64, &mut b3);
176 assert!(b2 == b3);
177 }
178 }
179 }
180}