1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use std::collections::HashSet;

use crate::communication::SendingError;
use crate::error::ControllerError;
use keri_core::actor::prelude::HashFunctionCode;
use keri_core::oobi::Scheme;
use keri_core::prefix::IndexedSignature;
use keri_core::query::query_event::SignedKelQuery;
use keri_core::{
    actor::{prelude::SerializationFormats, simple_controller::PossibleResponse},
    event::sections::seal::EventSeal,
    prefix::{IdentifierPrefix, SelfSigningPrefix},
    query::query_event::{LogsQueryArgs, QueryEvent, QueryRoute},
};

use super::Identifier;

#[derive(Debug, PartialEq)]
pub enum QueryResponse {
    Updates,
    NoUpdates,
}

#[derive(thiserror::Error, Debug)]
pub enum WatcherResponseError {
    #[error("Unexpected watcher response")]
    UnexpectedResponse,
    #[error("Watcher response processing error: {0}")]
    ResponseProcessingError(#[from] keri_core::error::Error),
    #[error(transparent)]
    SendingError(#[from] SendingError),
}

impl Identifier {
    pub fn query_watchers(
        &self,
        about_who: &EventSeal,
    ) -> Result<Vec<QueryEvent>, ControllerError> {
        self.known_events
            .get_watchers(&self.id)?
            .into_iter()
            .map(|watcher| self.query_log(about_who, watcher))
            .collect()
    }

    /// Joins query events with their signatures, sends it to witness and
    /// process its response. If user action is needed to finalize process,
    /// returns proper notification.
    pub async fn finalize_query(
        &mut self,
        queries: Vec<(QueryEvent, SelfSigningPrefix)>,
    ) -> Result<QueryResponse, WatcherResponseError> {
        let mut updates = QueryResponse::NoUpdates;
        let mut possibly_updated_ids: HashSet<IdentifierPrefix> = HashSet::new();
        for (qry, sig) in queries {
            match self.handle_query(&qry, sig).await? {
                PossibleResponse::Kel(kel) => {
                    for event in kel {
                        let id = event.get_prefix();
                        possibly_updated_ids.insert(id);
                        self.known_events.process(&event)?;
                    }
                }
                PossibleResponse::Mbx(_mbx) => {
                    return Err(WatcherResponseError::UnexpectedResponse);
                }
                PossibleResponse::Ksn(_) => todo!(),
            };
        }
        for id in possibly_updated_ids {
            let db_state = self.find_state(&id).ok();
            let cached_state = self.cached_identifiers.get(&id);
            if db_state.as_ref().eq(&cached_state) {
                updates = QueryResponse::NoUpdates
            } else {
                self.cached_identifiers.insert(id, db_state.unwrap());
                updates = QueryResponse::Updates
            }
        }
        Ok(updates)
    }

    /// Joins query events with their signatures, sends it to witness.
    async fn handle_query(
        &self,
        qry: &QueryEvent,
        sig: SelfSigningPrefix,
    ) -> Result<PossibleResponse, SendingError> {
        let recipient = match qry.get_route() {
            QueryRoute::Logs {
                reply_route: _,
                args,
            } => args.src.clone(),
            QueryRoute::Ksn {
                reply_route: _,
                args,
            } => args.src.clone(),
        };

        let query = match &self.id {
            IdentifierPrefix::Basic(bp) => {
                SignedKelQuery::new_nontrans(qry.clone(), bp.clone(), sig)
            }
            _ => {
                let signatures = vec![IndexedSignature::new_both_same(sig, 0)];
                SignedKelQuery::new_trans(qry.clone(), self.id().clone(), signatures)
            }
        };
        self.communication
            .send_query_to(recipient.as_ref().unwrap(), Scheme::Http, query)
            .await
    }

    fn query_log(
        &self,
        seal: &EventSeal,
        watcher: IdentifierPrefix,
    ) -> Result<QueryEvent, ControllerError> {
        Ok(QueryEvent::new_query(
            QueryRoute::Logs {
                reply_route: "".to_string(),
                args: LogsQueryArgs {
                    s: Some(seal.sn),
                    i: seal.prefix.clone(),
                    src: Some(watcher),
                },
            },
            SerializationFormats::JSON,
            HashFunctionCode::Blake3_256,
        )?)
    }
}