1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::trans::client::req_post;
use crate::raft::db::*;
use super::raft_enum::{Role, Which, Fields};
use super::raft_conf::{ConfigRaft, CONF, RaftVar};
use std::sync::{Arc, Mutex};
use std::collections::{HashMap, HashSet, VecDeque};
use super::req::ask_confirm_leader;

pub async fn resp_find_leader() -> anyhow::Result<String> {
    let leader_url=RaftVar::leader_url();
    if leader_url == "".to_string() {
        return Err(anyhow!("the leader url of mine is emtpy too"));
    }
    Ok(leader_url)
}


pub async fn resp_confirm_leader() -> anyhow::Result<String> {
    Ok(RaftVar::role())
}

pub async fn resp_heart_beat() -> anyhow::Result<String> {
    let role = RaftVar::role();
    if role == Role::leader.name().to_string() {
        return Ok(Fields::success.name().to_string());
    }
    Ok(Fields::fail.name().to_string())
}

// save url inclue leader

pub async fn resp_peer_urls(url_peer: &str) -> anyhow::Result<String> {
    let conf = CONF.get().expect("can not get config raft");
    let mut s = HashSet::new();
    s.insert(url_peer.to_string());
    s.insert(conf.url_me.to_string());
    let res = update_set_from_set(Fields::peer_urls.name(), &s)?;
    Ok(res)
}

pub async fn resp_append_entry(data: &str) -> anyhow::Result<String> {
    let now = chrono::Local::now();
    let id = now.timestamp_nanos().to_string();
    RaftVar::add_snap_ids(&id);
    insert(&id, data)?;
    Ok(id)
}

pub async fn resp_query_id(id: &str) -> anyhow::Result<String> {
    let res = get(id)?;
    Ok(res)
}

pub async fn resp_snapshot_ids() -> anyhow::Result<String> {
    let ids=RaftVar::snap_ids();
    let res = serde_json::to_string(&ids)?;
    Ok(res)
}

pub async fn resp_peers_vote(snapshot: &str) -> anyhow::Result<String> {
    let role = RaftVar::role();
// if i am leader ,return false

    if role == Role::leader.name().to_string() {
        return Ok(Fields::fail.name().to_string());
    }
    // if current leader confirm fail

    let leader_exist = ask_confirm_leader().await?;
    if leader_exist {
        return Ok(Fields::fail.name().to_string());
    }
    // if yours contains mine

    let better = yours_snapshot_better(snapshot)?;
    if !better {
        return Ok(Fields::fail.name().to_string());
    }
    Ok(Fields::success.name().to_string())
}

fn yours_snapshot_better(snapshot: &str) -> anyhow::Result<bool> {
    let mine=RaftVar::snap_ids();
    let yours:VecDeque<String> = serde_json::from_str(&snapshot)?;
    for my in mine.iter() {
        if !yours.contains(my) {
            return Ok(false);
        }
    }
    Ok(true)
}

pub async fn resp_leader_change(leader_url: &str) -> anyhow::Result<String> {
    let role=RaftVar::role();
// if i am not follower

    if role != Role::follower.name().to_string() {
        RaftVar::set_role(Role::follower.name());
    }
    // change leader here

    RaftVar::set_leader_url(leader_url);
    Ok(Fields::success.name().to_string())
}