1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fmt::{self, Debug};
3use std::hash::{Hash, Hasher};
4
5use linked_hash_map::LinkedHashMap;
6
7use super::trans::Action;
8use super::{Eid, Id, Txid};
9use base::crypto::{HashKey, HASHKEY_SIZE};
10use error::{Error, Result};
11use volume::{
12 AllocatorRef, Arm, ArmAccess, Armor, Seq, VolumeRef, VolumeWalArmor,
13};
14
15#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)]
17pub enum EntityType {
18 Cow,
19 Direct,
20}
21
22#[derive(Debug, Clone, Deserialize, Serialize)]
24pub struct Entry {
25 id: Eid,
26 action: Action,
27 ent_type: EntityType,
28 arm: Arm,
29}
30
31impl PartialEq for Entry {
32 fn eq(&self, other: &Entry) -> bool {
33 self.id == other.id
34 }
35}
36
37impl Eq for Entry {}
38
39#[derive(Debug, Clone, Default, Eq, Deserialize, Serialize)]
41pub struct Wal {
42 id: Eid,
43 seq: u64,
44 arm: Arm,
45 txid: Txid,
46 entries: LinkedHashMap<Eid, Entry>,
47}
48
49impl Wal {
50 const ID_HASH_KEY: [u8; HASHKEY_SIZE] = [42u8; HASHKEY_SIZE];
52
53 pub fn new(txid: Txid) -> Self {
54 Wal {
55 id: Self::derive_id(txid),
56 seq: 0,
57 arm: Arm::default(),
58 txid,
59 entries: LinkedHashMap::new(),
60 }
61 }
62
63 fn derive_id(txid: Txid) -> Eid {
65 let mut hash_key = HashKey::new_empty();
66 hash_key.copy(&Self::ID_HASH_KEY[..]);
67 txid.derive_id(&hash_key)
68 }
69
70 #[inline]
71 pub fn add_entry(
72 &mut self,
73 id: &Eid,
74 action: Action,
75 ent_type: EntityType,
76 arm: Arm,
77 ) {
78 self.entries.insert(
79 id.clone(),
80 Entry {
81 id: id.clone(),
82 action,
83 ent_type,
84 arm,
85 },
86 );
87 }
88
89 #[inline]
90 pub fn remove_entry(&mut self, id: &Eid) {
91 self.entries.remove(id);
92 }
93
94 fn recycle(
96 &self,
97 wal_armor: &VolumeWalArmor<Self>,
98 vol: &VolumeRef,
99 ) -> Result<()> {
100 for ent in self.entries.values() {
101 match ent.action {
102 Action::New | Action::Update => {} Action::Delete => match ent.ent_type {
104 EntityType::Cow => wal_armor.remove_all_arms(&ent.id)?,
105 EntityType::Direct => {
106 let mut vol = vol.write().unwrap();
107 vol.del(&ent.id)?;
108 }
109 },
110 }
111 }
112 Ok(())
113 }
114
115 pub fn clean_aborted(&self, vol: &VolumeRef) -> Result<()> {
117 for ent in self.entries.values() {
118 match ent.action {
119 Action::New => match ent.ent_type {
120 EntityType::Cow => Arm::remove_all(&ent.id, vol)?,
121 EntityType::Direct => {
122 let mut vol = vol.write().unwrap();
123 vol.del(&ent.id)?;
124 }
125 },
126 Action::Update => match ent.ent_type {
127 EntityType::Cow => ent.arm.remove_arm(&ent.id, vol)?,
128 EntityType::Direct => {
129 let mut vol = vol.write().unwrap();
130 vol.del(&ent.id)?;
131 }
132 },
133 Action::Delete => {} }
135 }
136 Ok(())
137 }
138}
139
140impl Hash for Wal {
141 fn hash<H: Hasher>(&self, state: &mut H) {
142 self.txid.val().hash(state);
143 }
144}
145
146impl PartialEq for Wal {
147 fn eq(&self, other: &Wal) -> bool {
148 self.txid == other.txid
149 }
150}
151
152impl Id for Wal {
153 #[inline]
154 fn id(&self) -> &Eid {
155 &self.id
156 }
157
158 #[inline]
159 fn id_mut(&mut self) -> &mut Eid {
160 &mut self.id
161 }
162}
163
164impl Seq for Wal {
165 #[inline]
166 fn seq(&self) -> u64 {
167 self.seq
168 }
169
170 #[inline]
171 fn inc_seq(&mut self) {
172 self.seq += 1
173 }
174}
175
176impl<'de> ArmAccess<'de> for Wal {
177 #[inline]
178 fn arm(&self) -> Arm {
179 self.arm
180 }
181
182 #[inline]
183 fn arm_mut(&mut self) -> &mut Arm {
184 &mut self.arm
185 }
186}
187
188#[derive(Default, Clone, Deserialize, Serialize)]
193struct WalQueue {
194 id: Eid,
195 seq: u64,
196 arm: Arm,
197
198 txid_wmark: u64,
200 blk_wmark: usize,
201
202 done: VecDeque<Txid>,
204
205 doing: HashSet<Txid>,
207
208 #[serde(skip_serializing, skip_deserializing, default)]
209 aborting: HashMap<Txid, Wal>,
210
211 #[serde(skip_serializing, skip_deserializing, default)]
212 wal_armor: VolumeWalArmor<Wal>,
213
214 #[serde(skip_serializing, skip_deserializing, default)]
215 allocator: AllocatorRef,
216
217 #[serde(skip_serializing, skip_deserializing, default)]
218 vol: VolumeRef,
219}
220
221impl WalQueue {
222 const COMMITTED_QUEUE_SIZE: usize = 2;
223
224 pub fn new(id: &Eid, vol: &VolumeRef) -> Self {
225 let allocator = {
226 let vol = vol.read().unwrap();
227 vol.get_allocator()
228 };
229 WalQueue {
230 id: id.clone(),
231 seq: 0,
232 arm: Arm::default(),
233 txid_wmark: 0,
234 blk_wmark: 0,
235 done: VecDeque::new(),
236 doing: HashSet::new(),
237 aborting: HashMap::new(),
238 wal_armor: VolumeWalArmor::new(vol),
239 allocator,
240 vol: vol.clone(),
241 }
242 }
243
244 #[inline]
245 fn watermarks(&self) -> (u64, usize) {
246 (self.txid_wmark, self.blk_wmark)
247 }
248
249 #[inline]
250 fn set_watermarks(&mut self, txid_wmark: u64, blk_wmark: usize) {
251 self.txid_wmark = txid_wmark;
252 self.blk_wmark = blk_wmark;
253 }
254
255 #[inline]
256 fn has_doing(&self) -> bool {
257 !self.doing.is_empty()
258 }
259
260 #[inline]
261 fn has_abort(&self) -> bool {
262 !self.aborting.is_empty()
263 }
264
265 #[inline]
266 fn open(&mut self, vol: &VolumeRef) {
267 self.wal_armor = VolumeWalArmor::new(vol);
268 self.vol = vol.clone();
269 }
270
271 #[inline]
272 fn begin_trans(&mut self, txid: Txid) {
273 assert!(!self.doing.contains(&txid));
274 self.doing.insert(txid);
275 }
276
277 fn recycle_trans(&mut self) -> Result<()> {
278 let retiree_txid = self.done.front().unwrap();
280 let retiree_id = Wal::derive_id(*retiree_txid);
281
282 debug!("recycle tx#{}", retiree_txid);
284 match self.wal_armor.load_item(&retiree_id) {
285 Ok(retiree) => {
286 retiree.recycle(&self.wal_armor, &self.vol)?;
288 self.wal_armor.remove_all_arms(&retiree_id)
289 }
290 Err(ref err) if *err == Error::NotFound => {
291 Ok(())
294 }
295 Err(err) => Err(err),
296 }
297 }
298
299 fn commit_trans(&mut self, wal: Wal) -> Result<()> {
300 while self.done.len() >= Self::COMMITTED_QUEUE_SIZE {
302 self.recycle_trans()?;
303 self.done.pop_front();
304 }
305
306 self.doing.remove(&wal.txid);
308 self.done.push_back(wal.txid);
309
310 Ok(())
311 }
312
313 #[inline]
314 fn begin_abort(&mut self, wal: &Wal) {
315 self.aborting.insert(wal.txid, wal.clone());
316 }
317
318 #[inline]
319 fn end_abort(&mut self, txid: Txid) {
320 self.aborting.remove(&txid);
321 self.doing.remove(&txid);
322 }
323
324 fn hot_redo_abort(&mut self) -> Result<()> {
326 let mut completed = Vec::new();
327
328 for wal in self.aborting.values() {
329 debug!("hot redo abort tx#{}", wal.txid);
330 wal.clean_aborted(&self.vol)?;
331 completed.push(wal.txid);
332 }
333
334 for txid in completed.iter() {
336 self.end_abort(*txid);
337 }
338
339 Ok(())
340 }
341
342 fn cold_redo_abort(&mut self) -> Result<()> {
344 let mut completed = Vec::new();
345
346 for txid in &self.doing {
347 debug!("cold redo abort tx#{}", txid);
348 let wal_id = Wal::derive_id(*txid);
349 match self.wal_armor.load_item(&wal_id) {
350 Ok(wal) => wal.clean_aborted(&self.vol)?,
351 Err(ref err) if *err == Error::NotFound => {}
352 Err(err) => return Err(err),
353 }
354 completed.push(*txid);
355 }
356
357 for txid in completed.iter() {
359 self.end_abort(*txid);
360 }
361
362 Ok(())
363 }
364}
365
366impl Debug for WalQueue {
367 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
368 f.debug_struct("WalQueue")
369 .field("id", &self.id)
370 .field("seq", &self.seq)
371 .field("arm", &self.arm)
372 .field("done", &self.done)
373 .field("doing", &self.doing)
374 .field("aborting", &self.aborting)
375 .finish()
376 }
377}
378
379impl Id for WalQueue {
380 #[inline]
381 fn id(&self) -> &Eid {
382 &self.id
383 }
384
385 #[inline]
386 fn id_mut(&mut self) -> &mut Eid {
387 &mut self.id
388 }
389}
390
391impl Seq for WalQueue {
392 #[inline]
393 fn seq(&self) -> u64 {
394 self.seq
395 }
396
397 #[inline]
398 fn inc_seq(&mut self) {
399 self.seq += 1
400 }
401}
402
403impl<'de> ArmAccess<'de> for WalQueue {
404 #[inline]
405 fn arm(&self) -> Arm {
406 self.arm
407 }
408
409 #[inline]
410 fn arm_mut(&mut self) -> &mut Arm {
411 &mut self.arm
412 }
413}
414
415#[derive(Default)]
417pub struct WalQueueMgr {
418 txid_wmark: Txid,
420
421 walq: WalQueue,
423 walq_backup: Option<WalQueue>,
424 walq_armor: VolumeWalArmor<WalQueue>,
425
426 allocator: AllocatorRef,
428
429 vol: VolumeRef,
430}
431
432impl WalQueueMgr {
433 pub fn new(walq_id: &Eid, vol: &VolumeRef) -> Self {
434 let allocator = {
435 let vol = vol.read().unwrap();
436 vol.get_allocator()
437 };
438 WalQueueMgr {
439 txid_wmark: Txid::from(0),
440 walq: WalQueue::new(walq_id, vol),
441 walq_backup: None,
442 walq_armor: VolumeWalArmor::new(vol),
443 allocator,
444 vol: vol.clone(),
445 }
446 }
447
448 pub fn open(&mut self, walq_id: &Eid) -> Result<()> {
449 self.walq = self.walq_armor.load_item(walq_id)?;
451 self.walq.open(&self.vol);
452
453 let (txid_wmark, blk_wmark) = self.walq.watermarks();
455 self.txid_wmark = Txid::from(txid_wmark);
456 {
457 let mut allocator = self.allocator.write().unwrap();
458 allocator.set_block_wmark(blk_wmark);
459 }
460
461 if self.walq.has_doing() {
463 self.backup_walq();
464 self.walq
465 .cold_redo_abort()
466 .and_then(|_| {
467 self.save_walq()?;
468 debug!("cold abort completed");
469 Ok(())
470 })
471 .or_else(|err| {
472 debug!("cold redo abort failed: {:?}", err);
473 self.restore_walq();
474 Err(err)
475 })?;
476 }
477
478 debug!(
479 "WalQueue opened, seq: {}, watermarks: (txid: {}, block: {})",
480 self.walq.seq(),
481 txid_wmark,
482 blk_wmark
483 );
484
485 Ok(())
486 }
487
488 #[inline]
489 pub fn next_txid(&mut self) -> Txid {
490 self.txid_wmark.next()
491 }
492
493 #[inline]
494 fn backup_walq(&mut self) {
495 self.walq_backup = Some(self.walq.clone());
496 }
497
498 #[inline]
499 fn restore_walq(&mut self) {
500 self.walq = self.walq_backup.take().unwrap();
501 }
502
503 fn save_walq(&mut self) -> Result<()> {
504 let blk_wmark = {
506 let allocator = self.allocator.read().unwrap();
507 allocator.block_wmark()
508 };
509 self.walq.set_watermarks(self.txid_wmark.val(), blk_wmark);
510
511 {
513 let mut vol = self.vol.write().unwrap();
514 vol.flush()
515 }
516 .and_then(|_| self.walq_armor.save_item(&mut self.walq))
517 }
518
519 pub fn begin_trans(&mut self, txid: Txid) -> Result<()> {
520 self.backup_walq();
521 self.walq.begin_trans(txid);
522 self.save_walq().or_else(|err| {
523 self.restore_walq();
524 Err(err)
525 })
526 }
527
528 pub fn commit_trans(&mut self, wal: Wal) -> Result<()> {
529 self.backup_walq();
530 self.walq
531 .commit_trans(wal)
532 .and_then(|_| self.save_walq())
533 .or_else(|err| {
534 self.restore_walq();
536 Err(err)
537 })
538 }
539
540 #[inline]
541 pub fn begin_abort(&mut self, wal: &Wal) {
542 self.walq.begin_abort(wal)
543 }
544
545 pub fn end_abort(&mut self, txid: Txid) -> Result<()> {
546 self.backup_walq();
547 self.walq.end_abort(txid);
548 self.save_walq().or_else(|err| {
549 self.restore_walq();
550 Err(err)
551 })
552 }
553
554 pub fn hot_redo_abort(&mut self) -> Result<()> {
555 if !self.walq.has_abort() {
556 return Ok(());
557 }
558
559 self.backup_walq();
560 self.walq
561 .hot_redo_abort()
562 .and_then(|_| {
563 self.save_walq()?;
564 debug!("hot abort completed");
565 Ok(())
566 })
567 .or_else(|err| {
568 debug!("hot redo abort failed: {:?}", err);
570 self.restore_walq();
571 Err(err)
572 })
573 }
574}
575
576impl Debug for WalQueueMgr {
577 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
578 f.debug_struct("WalQueueMgr")
579 .field("txid_wmark", &self.txid_wmark)
580 .field("walq", &self.walq)
581 .finish()
582 }
583}