keri_controller/identifier/
query.rs

1use std::collections::HashSet;
2
3use crate::communication::SendingError;
4use crate::error::ControllerError;
5use futures::future::join_all;
6use keri_core::actor::error::ActorError;
7use keri_core::actor::possible_response::PossibleResponse;
8use keri_core::actor::prelude::HashFunctionCode;
9use keri_core::error::Error;
10use keri_core::oobi::Scheme;
11use keri_core::prefix::IndexedSignature;
12use keri_core::query::query_event::SignedKelQuery;
13use keri_core::{
14    actor::prelude::SerializationFormats,
15    event::sections::seal::EventSeal,
16    prefix::{IdentifierPrefix, SelfSigningPrefix},
17    query::query_event::{LogsQueryArgs, QueryEvent, QueryRoute},
18};
19
20use super::Identifier;
21
22#[derive(Debug, PartialEq)]
23pub enum QueryResponse {
24    Updates,
25    NoUpdates,
26}
27
28#[derive(thiserror::Error, Debug)]
29pub enum WatcherResponseError {
30    #[error("Unexpected watcher response")]
31    UnexpectedResponse,
32    #[error("Watcher response processing error: {0:?}")]
33    ResponseProcessingError(Vec<keri_core::error::Error>),
34    #[error(transparent)]
35    SendingError(#[from] SendingError),
36    #[error("KEL of {0} not found")]
37    KELNotFound(IdentifierPrefix),
38    #[error("Poison error")]
39    PoisonError,
40}
41
42impl Identifier {
43    pub fn query_watchers(
44        &self,
45        about_who: &EventSeal,
46    ) -> Result<Vec<QueryEvent>, ControllerError> {
47        self.known_events
48            .get_watchers(&self.id)?
49            .into_iter()
50            .map(|watcher| self.query_log_range(&about_who.prefix, 0, about_who.sn + 1, watcher))
51            .collect()
52    }
53
54    async fn finalize_single_query(
55        &self,
56        qry: QueryEvent,
57        sig: SelfSigningPrefix,
58    ) -> Result<HashSet<IdentifierPrefix>, WatcherResponseError> {
59        match self.handle_query(qry, sig).await {
60            Ok(PossibleResponse::Kel(kel)) => {
61                let mut possibly_updated_ids = HashSet::new();
62                let errs = kel
63                    .into_iter()
64                    .filter_map(|event| {
65                        let id = event.get_prefix();
66                        possibly_updated_ids.insert(id);
67                        match self.known_events.process(&event) {
68                            Ok(_) => None,
69                            Err(err) => Some(err),
70                        }
71                    })
72                    .collect::<Vec<Error>>();
73                if errs.is_empty() {
74                    Ok(possibly_updated_ids)
75                } else {
76                    Err(WatcherResponseError::ResponseProcessingError(errs))
77                }
78            }
79            Ok(PossibleResponse::Mbx(_mbx)) => Err(WatcherResponseError::UnexpectedResponse),
80            Ok(PossibleResponse::Ksn(_)) => Err(WatcherResponseError::UnexpectedResponse),
81            Err(SendingError::ActorInternalError(ActorError::NotFound(id))) => {
82                Err(WatcherResponseError::KELNotFound(id))
83            }
84            Err(e) => Err(e.into()),
85        }
86    }
87
88    /// Joins query events with their signatures, sends it to recipient and
89    /// process its response. Returns a tuple containing two elements:
90    ///     1. A notification if any identifier's KEL (Key Event Log) was updated.
91    ///     2. A list of errors that occurred either during sending or on the recipient side.
92    pub async fn finalize_query(
93        &self,
94        queries: Vec<(QueryEvent, SelfSigningPrefix)>,
95    ) -> (QueryResponse, Vec<WatcherResponseError>) {
96        let mut updates = QueryResponse::NoUpdates;
97        let res = join_all(
98            queries
99                .into_iter()
100                .map(|(qry, sig)| self.finalize_single_query(qry, sig)),
101        )
102        .await;
103
104        let (possibly_updated_ids, mut errs) =
105            res.into_iter()
106                .fold(
107                    (HashSet::new(), vec![]),
108                    |(mut oks, mut errs), result| match result {
109                        Ok(set) => {
110                            for id in set {
111                                oks.insert(id);
112                            }
113                            (oks, errs)
114                        }
115                        Err(e) => {
116                            errs.push(e);
117                            (oks, errs)
118                        }
119                    },
120                );
121
122        for id in possibly_updated_ids {
123            let db_state = self.find_state(&id).ok();
124
125            let cached_state = match self.cached_identifiers.lock() {
126                Ok(ids) => ids.get(&id).map(|a| a.clone()),
127                Err(_e) => {
128                    errs.push(WatcherResponseError::PoisonError);
129                    None
130                }
131            };
132
133            if db_state.as_ref().eq(&cached_state.as_ref()) {
134                updates = QueryResponse::NoUpdates
135            } else {
136                match self.cached_identifiers.lock() {
137                    Ok(mut ids) => {
138                        ids.insert(id, db_state.unwrap());
139                    }
140                    Err(_e) => errs.push(WatcherResponseError::PoisonError),
141                };
142                updates = QueryResponse::Updates
143            }
144        }
145        (updates, errs)
146    }
147
148    /// Joins query events with their signatures, sends it to witness.
149    async fn handle_query(
150        &self,
151        qry: QueryEvent,
152        sig: SelfSigningPrefix,
153    ) -> Result<PossibleResponse, SendingError> {
154        let recipient = match qry.get_route() {
155            QueryRoute::Logs {
156                reply_route: _,
157                args,
158            } => args.src.clone(),
159            QueryRoute::Ksn {
160                reply_route: _,
161                args,
162            } => args.src.clone(),
163        };
164
165        let query = match &self.id {
166            IdentifierPrefix::Basic(bp) => {
167                SignedKelQuery::new_nontrans(qry.clone(), bp.clone(), sig)
168            }
169            _ => {
170                let signatures = vec![IndexedSignature::new_both_same(sig, 0)];
171                SignedKelQuery::new_trans(qry.clone(), self.id().clone(), signatures)
172            }
173        };
174        self.communication
175            .send_query_to(recipient.as_ref().unwrap(), Scheme::Http, query)
176            .await
177    }
178
179    fn query_log_range(
180        &self,
181        id: &IdentifierPrefix,
182        sn: u64,
183        limit: u64,
184        watcher: IdentifierPrefix,
185    ) -> Result<QueryEvent, ControllerError> {
186        Ok(QueryEvent::new_query(
187            QueryRoute::Logs {
188                reply_route: "".to_string(),
189                args: LogsQueryArgs {
190                    s: Some(sn),
191                    i: id.clone(),
192                    src: Some(watcher),
193                    limit: Some(limit),
194                },
195            },
196            SerializationFormats::JSON,
197            HashFunctionCode::Blake3_256,
198        ))
199    }
200
201    pub fn query_full_log(
202        &self,
203        id: &IdentifierPrefix,
204        watcher: IdentifierPrefix,
205    ) -> Result<QueryEvent, ControllerError> {
206        Ok(QueryEvent::new_query(
207            QueryRoute::Logs {
208                reply_route: "".to_string(),
209                args: LogsQueryArgs {
210                    s: None,
211                    i: id.clone(),
212                    src: Some(watcher),
213                    limit: None,
214                },
215            },
216            SerializationFormats::JSON,
217            HashFunctionCode::Blake3_256,
218        ))
219    }
220}