zbox/trans/
wal.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fmt::{self, Debug};
3use std::hash::{Hash, Hasher};
4
5use linked_hash_map::LinkedHashMap;
6
7use super::trans::Action;
8use super::{Eid, Id, Txid};
9use base::crypto::{HashKey, HASHKEY_SIZE};
10use error::{Error, Result};
11use volume::{
12    AllocatorRef, Arm, ArmAccess, Armor, Seq, VolumeRef, VolumeWalArmor,
13};
14
15/// Wal entry entity type
16#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)]
17pub enum EntityType {
18    Cow,
19    Direct,
20}
21
22/// Wal entry
23#[derive(Debug, Clone, Deserialize, Serialize)]
24pub struct Entry {
25    id: Eid,
26    action: Action,
27    ent_type: EntityType,
28    arm: Arm,
29}
30
31impl PartialEq for Entry {
32    fn eq(&self, other: &Entry) -> bool {
33        self.id == other.id
34    }
35}
36
37impl Eq for Entry {}
38
39/// Write Ahead Log (WAL)
40#[derive(Debug, Clone, Default, Eq, Deserialize, Serialize)]
41pub struct Wal {
42    id: Eid,
43    seq: u64,
44    arm: Arm,
45    txid: Txid,
46    entries: LinkedHashMap<Eid, Entry>,
47}
48
49impl Wal {
50    // hash key for wal id derivation
51    const ID_HASH_KEY: [u8; HASHKEY_SIZE] = [42u8; HASHKEY_SIZE];
52
53    pub fn new(txid: Txid) -> Self {
54        Wal {
55            id: Self::derive_id(txid),
56            seq: 0,
57            arm: Arm::default(),
58            txid,
59            entries: LinkedHashMap::new(),
60        }
61    }
62
63    // derive wal id from txid
64    fn derive_id(txid: Txid) -> Eid {
65        let mut hash_key = HashKey::new_empty();
66        hash_key.copy(&Self::ID_HASH_KEY[..]);
67        txid.derive_id(&hash_key)
68    }
69
70    #[inline]
71    pub fn add_entry(
72        &mut self,
73        id: &Eid,
74        action: Action,
75        ent_type: EntityType,
76        arm: Arm,
77    ) {
78        self.entries.insert(
79            id.clone(),
80            Entry {
81                id: id.clone(),
82                action,
83                ent_type,
84                arm,
85            },
86        );
87    }
88
89    #[inline]
90    pub fn remove_entry(&mut self, id: &Eid) {
91        self.entries.remove(id);
92    }
93
94    // recycle tx entries in a wal
95    fn recycle(
96        &self,
97        wal_armor: &VolumeWalArmor<Self>,
98        vol: &VolumeRef,
99    ) -> Result<()> {
100        for ent in self.entries.values() {
101            match ent.action {
102                Action::New | Action::Update => {} // do nothing
103                Action::Delete => match ent.ent_type {
104                    EntityType::Cow => wal_armor.remove_all_arms(&ent.id)?,
105                    EntityType::Direct => {
106                        let mut vol = vol.write().unwrap();
107                        vol.del(&ent.id)?;
108                    }
109                },
110            }
111        }
112        Ok(())
113    }
114
115    // clean each aborted entry in wal
116    pub fn clean_aborted(&self, vol: &VolumeRef) -> Result<()> {
117        for ent in self.entries.values() {
118            match ent.action {
119                Action::New => match ent.ent_type {
120                    EntityType::Cow => Arm::remove_all(&ent.id, vol)?,
121                    EntityType::Direct => {
122                        let mut vol = vol.write().unwrap();
123                        vol.del(&ent.id)?;
124                    }
125                },
126                Action::Update => match ent.ent_type {
127                    EntityType::Cow => ent.arm.remove_arm(&ent.id, vol)?,
128                    EntityType::Direct => {
129                        let mut vol = vol.write().unwrap();
130                        vol.del(&ent.id)?;
131                    }
132                },
133                Action::Delete => {} // do nothing
134            }
135        }
136        Ok(())
137    }
138}
139
140impl Hash for Wal {
141    fn hash<H: Hasher>(&self, state: &mut H) {
142        self.txid.val().hash(state);
143    }
144}
145
146impl PartialEq for Wal {
147    fn eq(&self, other: &Wal) -> bool {
148        self.txid == other.txid
149    }
150}
151
152impl Id for Wal {
153    #[inline]
154    fn id(&self) -> &Eid {
155        &self.id
156    }
157
158    #[inline]
159    fn id_mut(&mut self) -> &mut Eid {
160        &mut self.id
161    }
162}
163
164impl Seq for Wal {
165    #[inline]
166    fn seq(&self) -> u64 {
167        self.seq
168    }
169
170    #[inline]
171    fn inc_seq(&mut self) {
172        self.seq += 1
173    }
174}
175
176impl<'de> ArmAccess<'de> for Wal {
177    #[inline]
178    fn arm(&self) -> Arm {
179        self.arm
180    }
181
182    #[inline]
183    fn arm_mut(&mut self) -> &mut Arm {
184        &mut self.arm
185    }
186}
187
188/// Wal queue
189///
190/// The whole wal queue should be able to fit into one block, so
191/// the persisted size should less than one block size.
192#[derive(Default, Clone, Deserialize, Serialize)]
193struct WalQueue {
194    id: Eid,
195    seq: u64,
196    arm: Arm,
197
198    // txid and block watermark
199    txid_wmark: u64,
200    blk_wmark: usize,
201
202    // completed tx queue
203    done: VecDeque<Txid>,
204
205    // in-progress tx id list
206    doing: HashSet<Txid>,
207
208    #[serde(skip_serializing, skip_deserializing, default)]
209    aborting: HashMap<Txid, Wal>,
210
211    #[serde(skip_serializing, skip_deserializing, default)]
212    wal_armor: VolumeWalArmor<Wal>,
213
214    #[serde(skip_serializing, skip_deserializing, default)]
215    allocator: AllocatorRef,
216
217    #[serde(skip_serializing, skip_deserializing, default)]
218    vol: VolumeRef,
219}
220
221impl WalQueue {
222    const COMMITTED_QUEUE_SIZE: usize = 2;
223
224    pub fn new(id: &Eid, vol: &VolumeRef) -> Self {
225        let allocator = {
226            let vol = vol.read().unwrap();
227            vol.get_allocator()
228        };
229        WalQueue {
230            id: id.clone(),
231            seq: 0,
232            arm: Arm::default(),
233            txid_wmark: 0,
234            blk_wmark: 0,
235            done: VecDeque::new(),
236            doing: HashSet::new(),
237            aborting: HashMap::new(),
238            wal_armor: VolumeWalArmor::new(vol),
239            allocator,
240            vol: vol.clone(),
241        }
242    }
243
244    #[inline]
245    fn watermarks(&self) -> (u64, usize) {
246        (self.txid_wmark, self.blk_wmark)
247    }
248
249    #[inline]
250    fn set_watermarks(&mut self, txid_wmark: u64, blk_wmark: usize) {
251        self.txid_wmark = txid_wmark;
252        self.blk_wmark = blk_wmark;
253    }
254
255    #[inline]
256    fn has_doing(&self) -> bool {
257        !self.doing.is_empty()
258    }
259
260    #[inline]
261    fn has_abort(&self) -> bool {
262        !self.aborting.is_empty()
263    }
264
265    #[inline]
266    fn open(&mut self, vol: &VolumeRef) {
267        self.wal_armor = VolumeWalArmor::new(vol);
268        self.vol = vol.clone();
269    }
270
271    #[inline]
272    fn begin_trans(&mut self, txid: Txid) {
273        assert!(!self.doing.contains(&txid));
274        self.doing.insert(txid);
275    }
276
277    fn recycle_trans(&mut self) -> Result<()> {
278        // get retiree from the end of done queue
279        let retiree_txid = self.done.front().unwrap();
280        let retiree_id = Wal::derive_id(*retiree_txid);
281
282        // load the retired wal
283        debug!("recycle tx#{}", retiree_txid);
284        match self.wal_armor.load_item(&retiree_id) {
285            Ok(retiree) => {
286                // recycle and remove the wal
287                retiree.recycle(&self.wal_armor, &self.vol)?;
288                self.wal_armor.remove_all_arms(&retiree_id)
289            }
290            Err(ref err) if *err == Error::NotFound => {
291                // wal is already recycled and removed, do nothing
292                // here but skip it
293                Ok(())
294            }
295            Err(err) => Err(err),
296        }
297    }
298
299    fn commit_trans(&mut self, wal: Wal) -> Result<()> {
300        // recycle the retired trans
301        while self.done.len() >= Self::COMMITTED_QUEUE_SIZE {
302            self.recycle_trans()?;
303            self.done.pop_front();
304        }
305
306        // remove txid from doing list and enqueue it
307        self.doing.remove(&wal.txid);
308        self.done.push_back(wal.txid);
309
310        Ok(())
311    }
312
313    #[inline]
314    fn begin_abort(&mut self, wal: &Wal) {
315        self.aborting.insert(wal.txid, wal.clone());
316    }
317
318    #[inline]
319    fn end_abort(&mut self, txid: Txid) {
320        self.aborting.remove(&txid);
321        self.doing.remove(&txid);
322    }
323
324    // hot redo failed abort
325    fn hot_redo_abort(&mut self) -> Result<()> {
326        let mut completed = Vec::new();
327
328        for wal in self.aborting.values() {
329            debug!("hot redo abort tx#{}", wal.txid);
330            wal.clean_aborted(&self.vol)?;
331            completed.push(wal.txid);
332        }
333
334        // remove all txs which are completed retrying abort
335        for txid in completed.iter() {
336            self.end_abort(*txid);
337        }
338
339        Ok(())
340    }
341
342    // cold redo failed abort
343    fn cold_redo_abort(&mut self) -> Result<()> {
344        let mut completed = Vec::new();
345
346        for txid in &self.doing {
347            debug!("cold redo abort tx#{}", txid);
348            let wal_id = Wal::derive_id(*txid);
349            match self.wal_armor.load_item(&wal_id) {
350                Ok(wal) => wal.clean_aborted(&self.vol)?,
351                Err(ref err) if *err == Error::NotFound => {}
352                Err(err) => return Err(err),
353            }
354            completed.push(*txid);
355        }
356
357        // remove all txs which are succeed to retry abort
358        for txid in completed.iter() {
359            self.end_abort(*txid);
360        }
361
362        Ok(())
363    }
364}
365
366impl Debug for WalQueue {
367    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
368        f.debug_struct("WalQueue")
369            .field("id", &self.id)
370            .field("seq", &self.seq)
371            .field("arm", &self.arm)
372            .field("done", &self.done)
373            .field("doing", &self.doing)
374            .field("aborting", &self.aborting)
375            .finish()
376    }
377}
378
379impl Id for WalQueue {
380    #[inline]
381    fn id(&self) -> &Eid {
382        &self.id
383    }
384
385    #[inline]
386    fn id_mut(&mut self) -> &mut Eid {
387        &mut self.id
388    }
389}
390
391impl Seq for WalQueue {
392    #[inline]
393    fn seq(&self) -> u64 {
394        self.seq
395    }
396
397    #[inline]
398    fn inc_seq(&mut self) {
399        self.seq += 1
400    }
401}
402
403impl<'de> ArmAccess<'de> for WalQueue {
404    #[inline]
405    fn arm(&self) -> Arm {
406        self.arm
407    }
408
409    #[inline]
410    fn arm_mut(&mut self) -> &mut Arm {
411        &mut self.arm
412    }
413}
414
415/// WalQueue Manager
416#[derive(Default)]
417pub struct WalQueueMgr {
418    // txid watermark
419    txid_wmark: Txid,
420
421    // wal queue and wal queue armor
422    walq: WalQueue,
423    walq_backup: Option<WalQueue>,
424    walq_armor: VolumeWalArmor<WalQueue>,
425
426    // block allocator
427    allocator: AllocatorRef,
428
429    vol: VolumeRef,
430}
431
432impl WalQueueMgr {
433    pub fn new(walq_id: &Eid, vol: &VolumeRef) -> Self {
434        let allocator = {
435            let vol = vol.read().unwrap();
436            vol.get_allocator()
437        };
438        WalQueueMgr {
439            txid_wmark: Txid::from(0),
440            walq: WalQueue::new(walq_id, vol),
441            walq_backup: None,
442            walq_armor: VolumeWalArmor::new(vol),
443            allocator,
444            vol: vol.clone(),
445        }
446    }
447
448    pub fn open(&mut self, walq_id: &Eid) -> Result<()> {
449        // load wal queue
450        self.walq = self.walq_armor.load_item(walq_id)?;
451        self.walq.open(&self.vol);
452
453        // restore watermarks
454        let (txid_wmark, blk_wmark) = self.walq.watermarks();
455        self.txid_wmark = Txid::from(txid_wmark);
456        {
457            let mut allocator = self.allocator.write().unwrap();
458            allocator.set_block_wmark(blk_wmark);
459        }
460
461        // now redo abort tx if any
462        if self.walq.has_doing() {
463            self.backup_walq();
464            self.walq
465                .cold_redo_abort()
466                .and_then(|_| {
467                    self.save_walq()?;
468                    debug!("cold abort completed");
469                    Ok(())
470                })
471                .or_else(|err| {
472                    debug!("cold redo abort failed: {:?}", err);
473                    self.restore_walq();
474                    Err(err)
475                })?;
476        }
477
478        debug!(
479            "WalQueue opened, seq: {}, watermarks: (txid: {}, block: {})",
480            self.walq.seq(),
481            txid_wmark,
482            blk_wmark
483        );
484
485        Ok(())
486    }
487
488    #[inline]
489    pub fn next_txid(&mut self) -> Txid {
490        self.txid_wmark.next()
491    }
492
493    #[inline]
494    fn backup_walq(&mut self) {
495        self.walq_backup = Some(self.walq.clone());
496    }
497
498    #[inline]
499    fn restore_walq(&mut self) {
500        self.walq = self.walq_backup.take().unwrap();
501    }
502
503    fn save_walq(&mut self) -> Result<()> {
504        // get current block watermark and set it to wal queue
505        let blk_wmark = {
506            let allocator = self.allocator.read().unwrap();
507            allocator.block_wmark()
508        };
509        self.walq.set_watermarks(self.txid_wmark.val(), blk_wmark);
510
511        // flush volume then save wal queue
512        {
513            let mut vol = self.vol.write().unwrap();
514            vol.flush()
515        }
516        .and_then(|_| self.walq_armor.save_item(&mut self.walq))
517    }
518
519    pub fn begin_trans(&mut self, txid: Txid) -> Result<()> {
520        self.backup_walq();
521        self.walq.begin_trans(txid);
522        self.save_walq().or_else(|err| {
523            self.restore_walq();
524            Err(err)
525        })
526    }
527
528    pub fn commit_trans(&mut self, wal: Wal) -> Result<()> {
529        self.backup_walq();
530        self.walq
531            .commit_trans(wal)
532            .and_then(|_| self.save_walq())
533            .or_else(|err| {
534                // if commit failed, restore the walq backup
535                self.restore_walq();
536                Err(err)
537            })
538    }
539
540    #[inline]
541    pub fn begin_abort(&mut self, wal: &Wal) {
542        self.walq.begin_abort(wal)
543    }
544
545    pub fn end_abort(&mut self, txid: Txid) -> Result<()> {
546        self.backup_walq();
547        self.walq.end_abort(txid);
548        self.save_walq().or_else(|err| {
549            self.restore_walq();
550            Err(err)
551        })
552    }
553
554    pub fn hot_redo_abort(&mut self) -> Result<()> {
555        if !self.walq.has_abort() {
556            return Ok(());
557        }
558
559        self.backup_walq();
560        self.walq
561            .hot_redo_abort()
562            .and_then(|_| {
563                self.save_walq()?;
564                debug!("hot abort completed");
565                Ok(())
566            })
567            .or_else(|err| {
568                // if failed, restore the walq backup
569                debug!("hot redo abort failed: {:?}", err);
570                self.restore_walq();
571                Err(err)
572            })
573    }
574}
575
576impl Debug for WalQueueMgr {
577    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
578        f.debug_struct("WalQueueMgr")
579            .field("txid_wmark", &self.txid_wmark)
580            .field("walq", &self.walq)
581            .finish()
582    }
583}