1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use chrono::prelude::*;
use chrono::NaiveTime;
use super::raft_enum::{Role, Which, Fields};
use std::sync::{Arc, Mutex};
use super::raft_conf::{CONF, RV, ConfigRaft};
use std::collections::HashMap;
use crate::trans::client::req_post;
use crate::raft::db::{get, DB, remove, insert};
use super::req::{ask_confirm_leader, ask_find_leader};
use std::time::Duration;
use crate::raft::req::{ask_heart_beat, ask_peer_urls, ask_peers_vote, find_leader_again, ask_snapshot_ids, ask_leader_change};
use crate::raft::raft_conf::RaftVar;

pub async fn cron_app() -> anyhow::Result<()> {
    let conf = CONF.get().expect("can not get config raft");
    let tick_hb = conf.heartbeat_tick;
    let duration = Duration::from_secs(tick_hb as u64);

    let mut timer_vote = Timer::init(conf.election_tick);
    let mut timer_peer = Timer::init(conf.update_peers_tick);
    let mut timer_snapshot = Timer::init(conf.update_snapshot_tick);
    let mut timer_clear = Timer::init(conf.clear_db_tick);

    if !conf.first_node {
        ask_find_leader(&conf.url_peer).await.unwrap();
        ask_confirm_leader().await.unwrap();
    }

    loop {
        tokio::time::delay_for(duration).await;
        dbg!("round anaig");

        let role = RaftVar::role();

        if role == Role::follower.name().to_string() {
            let rec_hb = ask_heart_beat().await?;
            if rec_hb {
                timer_vote.sleep_again();
            } else {
                if timer_vote.wake_up() {
                    RaftVar::set_role(Role::candidate.name());
                    timer_vote.sleep_again();
                }
            }
            if timer_peer.wake_up() {
                ask_peer_urls().await?;
                timer_peer.sleep_again();
            }
            if timer_snapshot.wake_up() {
                ask_snapshot_ids().await?;
                timer_snapshot.sleep_again();
            }
        }

        if role == Role::candidate.name().to_string() {
            let win = ask_peers_vote().await?;
            if win {
                ask_leader_change().await?;
            } else {
                let find = find_leader_again().await?;
                if !find {
                    panic!("last contact from all peers and leader");
                }
                RaftVar::set_role(Role::follower.name());
                timer_vote.sleep_again();
            }
        }
        if role == Role::leader.name().to_string() {
            if timer_clear.wake_up() {
                clear_db()?;
                timer_clear.sleep_again();
            }
        }
    }
}


fn clear_db() -> anyhow::Result<()> {
    let ids = RaftVar::snap_ids();
    for kv in DB.iter() {
        let res = kv?.0.to_vec();
        let k = String::from_utf8(res)?;
        if !ids.contains(&k) && k != Fields::peer_urls.name().to_string() {
            remove(&k)?;
        }
    }
    Ok(())
}


struct Timer {
    ticks: i64,
    last: chrono::NaiveTime,
}

impl Timer {
    fn init(tick: i64) -> Self {
        Timer {
            ticks: tick,
            last: Local::now().time(),
        }
    }
    fn wake_up(&self) -> bool {
        let now = Local::now().time();
        let differ = now - self.last;
        let elapse = differ.num_seconds();
        elapse > self.ticks
    }
    fn sleep_again(&mut self) {
        let now = Local::now().time();
        self.last = now;
    }
}