1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use super::raft_conf::{ConfigRaft, RaftVar,CONF};
use std::collections::{HashMap, HashSet,VecDeque};
use crate::trans::client::req_post;
use super::raft_enum::{Role, Which, Fields};
use std::sync::{Arc, Mutex};
use super::db::*;

// befor loop and after election fail

pub async fn ask_find_leader(url: &str) -> anyhow::Result<()> {
    let res = req_post(url, Which::ask_leader, "").await?;
    // change leader here

    RaftVar::set_leader_url(&res);
    Ok(())
}

// befor loop

pub async fn ask_confirm_leader() -> anyhow::Result<bool> {
    let leader=RaftVar::leader_url();

    let res = req_post(&leader, Which::confirm_leader, "").await?;
    if res == Role::leader.name() {
        return Ok(true);
    }
    Ok(false)
}

// high frequency

pub async fn ask_heart_beat() -> anyhow::Result<bool> {
    let leader_url=RaftVar::leader_url();
    if leader_url == "".to_string() {
        panic!("leader url is empty")
    }
    let res = req_post(&leader_url, Which::heart_beat, "").await?;
    if res == Fields::success.name() {
        return Ok(true);
    }
    Ok(false)
}

// low frequency,report my url and update peer urls

pub async fn ask_peer_urls() -> anyhow::Result<bool> {
    let leader_url=RaftVar::leader_url();
    let conf = CONF.get().expect("can not get config raft");
    let me = &conf.url_me;
    let res = req_post(&leader_url, Which::peer_urls, me).await?;
    let res1: HashSet<String> = serde_json::from_str(&res)?;
    replace_set_from_set(Fields::peer_urls.name(), &res1)?;
    del_str_in_set(Fields::peer_urls.name(), &conf.url_me)?;
    Ok(true)
}
pub async fn ask_snapshot_ids() -> anyhow::Result<()> {
    let leader_url=RaftVar::leader_url();
    let res = req_post(&leader_url, Which::snapshot_ids, "").await?;
    let res1: VecDeque<String> = serde_json::from_str(&res)?;
    RaftVar::replace_snap_ids(res1);
    Ok(())
}

// low frequency,check all url and del dead one,result will be success,fail ,none(err dead)

pub async fn ask_peers_vote() -> anyhow::Result<bool> {
    let snap=RaftVar::snap_ids();
    let snapshot = serde_json::to_string(&snap)?;

    let _peers = get(Fields::peer_urls.name())?;
    let peers: HashSet<String> = serde_json::from_str(&_peers)?;

    let mut score = HashMap::new();

    for peer in peers.iter() {
        let res = req_post(peer, Which::peer_vote, &snapshot).await;
        match res {
            Ok(s) => score.insert(peer.to_string(), s),
            Err(e) => score.insert(peer.to_string(), Fields::none.name().to_string()),
        };
    }
    check_i_am_leader(&score)
}

fn check_i_am_leader(score: &HashMap<String, String>) -> anyhow::Result<bool> {
    let mut peers = HashSet::new();
    let mut agree = 0;
    let mut reject = 0;
    for (k, v) in score.iter() {
        if v == Fields::success.name() {
            peers.insert(k.to_string());
            agree = agree + 1;
        }
        if v == Fields::fail.name() {
            peers.insert(k.to_string());
            reject = reject + 1;
        }
    }
    if peers.len() > 0 {
        replace_set_from_set(Fields::peer_urls.name(), &peers)?;
    }
    if agree >= reject {
        return Ok(true);
    }
    Ok(false)
}

// if not find should panic

pub async fn find_leader_again() -> anyhow::Result<bool> {
    let peers = get(Fields::peer_urls.name())?;
    let peers: HashSet<String> = serde_json::from_str(&peers)?;
    for peer in peers.iter() {
        ask_find_leader(peer).await?;
        let find = ask_confirm_leader().await?;
        if find {
            RaftVar::set_role(Role::follower.name());
            return Ok(true);
        }
    }
    Ok(false)
}

pub async fn ask_leader_change() -> anyhow::Result<()> {
    let conf = CONF.get().expect("can not get config raft");
    let _peers = get(Fields::peer_urls.name())?;
    let peers: HashSet<String> = serde_json::from_str(&_peers)?;
    for peer in peers.iter() {
        req_post(peer, Which::leader_change, &conf.url_me).await?;
    }
    Ok(())
}