1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use chord_rs_core::{client::ClientError, Node, NodeId};
use error_stack::{IntoReport, ResultExt};
use futures::Future;

use crate::{
    chord_capnp::{self, chord_node::Client},
    client::CapnpClientError,
    parser::{ParserError, ResultBuilder},
};

use super::CmdResult;

#[derive(Debug)]
pub(crate) enum Command {
    FindSuccessor(NodeId, CmdResult<Node>),
    Successor(CmdResult<Node>),
    SuccessorList(CmdResult<Vec<Node>>),
    Predecessor(CmdResult<Option<Node>>),
    Notify(Node, CmdResult<()>),
    Ping(CmdResult<()>),
}

impl Command {
    pub(crate) fn get_error(&self) -> ClientError {
        match self {
            Command::FindSuccessor(_, _) => ClientError::FindSuccessorFailed,
            Command::Successor(_) => ClientError::GetSuccessorFailed,
            Command::SuccessorList(_) => ClientError::GetSuccessorListFailed,
            Command::Predecessor(_) => ClientError::GetPredecessorFailed,
            Command::Notify(_, _) => ClientError::NotifyFailed,
            Command::Ping(_) => ClientError::PingFailed,
        }
    }

    pub(crate) async fn ping(client: Client, sender: CmdResult<()>) {
        Self::handle_request(sender, ClientError::PingFailed, || async {
            let request = client.ping_request();

            request.send().promise.await?;
            Ok(())
        })
        .await
    }

    pub(crate) async fn find_successor(client: Client, id: NodeId, sender: CmdResult<Node>) {
        Self::handle_request(sender, ClientError::FindSuccessorFailed, || async {
            let mut request = client.find_successor_request();
            request.get().set_id(id.into());

            let reply = request.send().promise.await?;
            let node = reply.get()?.get_node()?.try_into()?;

            Ok(node)
        })
        .await
    }

    pub(crate) async fn get_successor(client: Client, sender: CmdResult<Node>) {
        Self::handle_request(sender, ClientError::GetSuccessorFailed, || async {
            let request = client.get_successor_request();

            let reply = request.send().promise.await?;
            let successor = reply.get()?.get_node()?.try_into()?;
            Ok(successor)
        })
        .await;
    }

    pub(crate) async fn get_successor_list(client: Client, sender: CmdResult<Vec<Node>>) {
        Self::handle_request(sender, ClientError::GetSuccessorListFailed, || async {
            let request = client.get_successor_list_request();

            let reply = request.send().promise.await?;
            let nodes = reply.get()?.get_nodes()?;
            let successors: Vec<Node> = nodes
                .iter()
                .map(|node| node.try_into())
                .collect::<Result<Vec<Node>, ParserError>>()?;
            Ok(successors)
        })
        .await;
    }

    pub(crate) async fn get_predecessor(client: Client, sender: CmdResult<Option<Node>>) {
        Self::handle_request(sender, ClientError::GetPredecessorFailed, || async {
            let request = client.get_predecessor_request();

            let reply = request.send().promise.await?;
            let node = reply.get()?.get_node()?;
            match node.which() {
                Ok(chord_capnp::option::None(())) => Ok(None),
                Ok(chord_capnp::option::Some(Ok(reader))) => {
                    let result: Result<Node, ParserError> = reader.try_into();
                    let node = result?;
                    Ok(Some(node))
                }
                Ok(chord_capnp::option::Some(Err(err))) => Err(err.into()),
                Err(err) => Err(err.into()),
            }
        })
        .await
    }

    pub(crate) async fn notify(client: Client, predecessor: Node, sender: CmdResult<()>) {
        Self::handle_request(sender, ClientError::NotifyFailed, || async {
            let mut request = client.notify_request();
            let node = request.get().init_node();
            node.insert(predecessor)?;

            let _ = request.send().promise.await;
            Ok(())
        })
        .await;
    }

    async fn handle_request<F, Res>(sender: CmdResult<Res>, ctx: ClientError, f: impl FnOnce() -> F)
    where
        F: Future<Output = Result<Res, CapnpClientError>>,
        Res: std::fmt::Debug,
    {
        let result = f()
            .await
            .map_err(|err| err.into())
            .into_report()
            .attach_printable_lazy(|| ctx);

        sender.send(result).unwrap();
    }
}