1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use std::sync::Arc;

use keri_core::{
    actor::{error::ActorError, simple_controller::PossibleResponse},
    event_message::signed_event_message::{Message, Notice, Op, SignedEventMessage},
    oobi::{EndRole, LocationScheme, Oobi, Scheme},
    prefix::{BasicPrefix, IdentifierPrefix},
    query::query_event::SignedKelQuery,
    transport::Transport,
};
use teliox::transport::GeneralTelTransport;

use crate::{error::ControllerError, known_events::KnownEvents};

pub struct Communication {
    pub events: Arc<KnownEvents>,
    pub transport: Box<dyn Transport + Send + Sync>,
    pub tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
}

impl Communication {
    pub fn new(
        known_events: Arc<KnownEvents>,
        transport: Box<dyn Transport<ActorError> + Send + Sync>,
        tel_transport: Box<dyn GeneralTelTransport + Send + Sync>,
    ) -> Self {
        Communication {
            events: known_events,
            transport,
            tel_transport,
        }
    }

    /// Make http request to get identifier's endpoints information.
    pub async fn resolve_loc_schema(&self, lc: &LocationScheme) -> Result<(), ControllerError> {
        let oobis = self.transport.request_loc_scheme(lc.clone()).await?;
        for oobi in oobis {
            self.events.save(&Message::Op(oobi))?;
        }
        Ok(())
    }

    /// Make http request to get identifier's endpoints information.
    pub async fn resolve_end_role(&self, er: &EndRole) -> Result<(), ControllerError> {
        let EndRole { cid, role, eid } = er.clone();
        // TODO what if more than one
        let loc = self
            .events
            .get_loc_schemas(&cid)?
            .first()
            .ok_or(ControllerError::UnknownIdentifierError)?
            .clone();
        let msgs = self.transport.request_end_role(loc, cid, role, eid).await?;
        for msg in msgs {
            // TODO This ignore signatures. Add verification.
            if let Message::Op(Op::Reply(signed_oobi)) = msg {
                self.events.save_oobi(&signed_oobi)?;
            } else {
                self.events.save(&msg)?;
            }
        }
        Ok(())
    }

    /// Make http request to get identifier's endpoints information.
    pub async fn resolve_oobi(&self, oobi: &Oobi) -> Result<(), ControllerError> {
        match oobi {
            Oobi::Location(loc) => self.resolve_loc_schema(loc).await,
            Oobi::EndRole(er) => self.resolve_end_role(er).await,
        }
    }

    pub async fn send_message_to(
        &self,
        id: &IdentifierPrefix,
        scheme: Scheme,
        msg: Message,
    ) -> Result<(), ControllerError> {
        let loc = self.events.find_location(id, scheme)?;
        self.transport.send_message(loc, msg).await?;
        Ok(())
    }

    pub async fn send_query_to(
        &self,
        id: &IdentifierPrefix,
        scheme: Scheme,
        query: SignedKelQuery,
    ) -> Result<PossibleResponse, ControllerError> {
        let loc = self.events.find_location(id, scheme)?;
        Ok(self.transport.send_query(loc, query).await?)
    }

    async fn send_oobi_to(
        &self,
        id: &IdentifierPrefix,
        scheme: Scheme,
        oobi: Oobi,
    ) -> Result<(), ControllerError> {
        let loc = self.events.find_location(id, scheme)?;
        self.transport.resolve_oobi(loc, oobi).await?;
        Ok(())
    }

    /// Publish key event to witnesses
    ///
    ///  1. send it to all witnesses
    ///  2. collect witness receipts and process them
    ///  3. get processed receipts from db and send it to all witnesses
    pub async fn publish(
        &self,
        witness_prefixes: &[BasicPrefix],
        message: &SignedEventMessage,
    ) -> Result<(), ControllerError> {
        for id in witness_prefixes {
            self.send_message_to(
                &IdentifierPrefix::Basic(id.clone()),
                Scheme::Http,
                Message::Notice(Notice::Event(message.clone())),
            )
            .await?;
            // process collected receipts
            // send query message for receipt mailbox
            // TODO: get receipts from mailbox
            // for receipt in receipts {
            //     self.process(&receipt)?;
            // }
        }

        // Get processed receipts from database to send all of them to witnesses. It
        // will return one receipt with all witness signatures as one attachment,
        // not three separate receipts as in `collected_receipts`.
        let (prefix, sn, digest) = (
            message.event_message.data.get_prefix(),
            message.event_message.data.get_sn(),
            message.event_message.digest(),
        );
        let rcts_from_db = self.events.find_receipt(&prefix, sn, &digest?)?;

        if let Some(receipt) = rcts_from_db {
            // send receipts to all witnesses
            for prefix in witness_prefixes {
                self.send_message_to(
                    &IdentifierPrefix::Basic(prefix.clone()),
                    Scheme::Http,
                    Message::Notice(Notice::NontransferableRct(receipt.clone())),
                )
                .await?;
            }
        };

        Ok(())
    }

    /// Sends identifier's endpoint information to identifiers's watchers.
    // TODO use stream instead of json
    pub async fn send_oobi_to_watcher(
        &self,
        id: &IdentifierPrefix,
        oobi: &Oobi,
    ) -> Result<(), ControllerError> {
        for watcher in self.events.get_watchers(id)?.iter() {
            self.send_oobi_to(watcher, Scheme::Http, oobi.clone())
                .await?;
        }

        Ok(())
    }
}