keri_controller/identifier/
query.rs1use std::collections::HashSet;
2
3use crate::communication::SendingError;
4use crate::error::ControllerError;
5use futures::future::join_all;
6use keri_core::actor::error::ActorError;
7use keri_core::actor::possible_response::PossibleResponse;
8use keri_core::actor::prelude::HashFunctionCode;
9use keri_core::error::Error;
10use keri_core::oobi::Scheme;
11use keri_core::prefix::IndexedSignature;
12use keri_core::query::query_event::SignedKelQuery;
13use keri_core::{
14 actor::prelude::SerializationFormats,
15 event::sections::seal::EventSeal,
16 prefix::{IdentifierPrefix, SelfSigningPrefix},
17 query::query_event::{LogsQueryArgs, QueryEvent, QueryRoute},
18};
19
20use super::Identifier;
21
22#[derive(Debug, PartialEq)]
23pub enum QueryResponse {
24 Updates,
25 NoUpdates,
26}
27
28#[derive(thiserror::Error, Debug)]
29pub enum WatcherResponseError {
30 #[error("Unexpected watcher response")]
31 UnexpectedResponse,
32 #[error("Watcher response processing error: {0:?}")]
33 ResponseProcessingError(Vec<keri_core::error::Error>),
34 #[error(transparent)]
35 SendingError(#[from] SendingError),
36 #[error("KEL of {0} not found")]
37 KELNotFound(IdentifierPrefix),
38 #[error("Poison error")]
39 PoisonError,
40}
41
42impl Identifier {
43 pub fn query_watchers(
44 &self,
45 about_who: &EventSeal,
46 ) -> Result<Vec<QueryEvent>, ControllerError> {
47 self.known_events
48 .get_watchers(&self.id)?
49 .into_iter()
50 .map(|watcher| self.query_log_range(&about_who.prefix, 0, about_who.sn + 1, watcher))
51 .collect()
52 }
53
54 async fn finalize_single_query(
55 &self,
56 qry: QueryEvent,
57 sig: SelfSigningPrefix,
58 ) -> Result<HashSet<IdentifierPrefix>, WatcherResponseError> {
59 match self.handle_query(qry, sig).await {
60 Ok(PossibleResponse::Kel(kel)) => {
61 let mut possibly_updated_ids = HashSet::new();
62 let errs = kel
63 .into_iter()
64 .filter_map(|event| {
65 let id = event.get_prefix();
66 possibly_updated_ids.insert(id);
67 match self.known_events.process(&event) {
68 Ok(_) => None,
69 Err(err) => Some(err),
70 }
71 })
72 .collect::<Vec<Error>>();
73 if errs.is_empty() {
74 Ok(possibly_updated_ids)
75 } else {
76 Err(WatcherResponseError::ResponseProcessingError(errs))
77 }
78 }
79 Ok(PossibleResponse::Mbx(_mbx)) => Err(WatcherResponseError::UnexpectedResponse),
80 Ok(PossibleResponse::Ksn(_)) => Err(WatcherResponseError::UnexpectedResponse),
81 Err(SendingError::ActorInternalError(ActorError::NotFound(id))) => {
82 Err(WatcherResponseError::KELNotFound(id))
83 }
84 Err(e) => Err(e.into()),
85 }
86 }
87
88 pub async fn finalize_query(
93 &self,
94 queries: Vec<(QueryEvent, SelfSigningPrefix)>,
95 ) -> (QueryResponse, Vec<WatcherResponseError>) {
96 let mut updates = QueryResponse::NoUpdates;
97 let res = join_all(
98 queries
99 .into_iter()
100 .map(|(qry, sig)| self.finalize_single_query(qry, sig)),
101 )
102 .await;
103
104 let (possibly_updated_ids, mut errs) =
105 res.into_iter()
106 .fold(
107 (HashSet::new(), vec![]),
108 |(mut oks, mut errs), result| match result {
109 Ok(set) => {
110 for id in set {
111 oks.insert(id);
112 }
113 (oks, errs)
114 }
115 Err(e) => {
116 errs.push(e);
117 (oks, errs)
118 }
119 },
120 );
121
122 for id in possibly_updated_ids {
123 let db_state = self.find_state(&id).ok();
124
125 let cached_state = match self.cached_identifiers.lock() {
126 Ok(ids) => ids.get(&id).map(|a| a.clone()),
127 Err(_e) => {
128 errs.push(WatcherResponseError::PoisonError);
129 None
130 }
131 };
132
133 if db_state.as_ref().eq(&cached_state.as_ref()) {
134 updates = QueryResponse::NoUpdates
135 } else {
136 match self.cached_identifiers.lock() {
137 Ok(mut ids) => {
138 ids.insert(id, db_state.unwrap());
139 }
140 Err(_e) => errs.push(WatcherResponseError::PoisonError),
141 };
142 updates = QueryResponse::Updates
143 }
144 }
145 (updates, errs)
146 }
147
148 async fn handle_query(
150 &self,
151 qry: QueryEvent,
152 sig: SelfSigningPrefix,
153 ) -> Result<PossibleResponse, SendingError> {
154 let recipient = match qry.get_route() {
155 QueryRoute::Logs {
156 reply_route: _,
157 args,
158 } => args.src.clone(),
159 QueryRoute::Ksn {
160 reply_route: _,
161 args,
162 } => args.src.clone(),
163 };
164
165 let query = match &self.id {
166 IdentifierPrefix::Basic(bp) => {
167 SignedKelQuery::new_nontrans(qry.clone(), bp.clone(), sig)
168 }
169 _ => {
170 let signatures = vec![IndexedSignature::new_both_same(sig, 0)];
171 SignedKelQuery::new_trans(qry.clone(), self.id().clone(), signatures)
172 }
173 };
174 self.communication
175 .send_query_to(recipient.as_ref().unwrap(), Scheme::Http, query)
176 .await
177 }
178
179 fn query_log_range(
180 &self,
181 id: &IdentifierPrefix,
182 sn: u64,
183 limit: u64,
184 watcher: IdentifierPrefix,
185 ) -> Result<QueryEvent, ControllerError> {
186 Ok(QueryEvent::new_query(
187 QueryRoute::Logs {
188 reply_route: "".to_string(),
189 args: LogsQueryArgs {
190 s: Some(sn),
191 i: id.clone(),
192 src: Some(watcher),
193 limit: Some(limit),
194 },
195 },
196 SerializationFormats::JSON,
197 HashFunctionCode::Blake3_256,
198 ))
199 }
200
201 pub fn query_full_log(
202 &self,
203 id: &IdentifierPrefix,
204 watcher: IdentifierPrefix,
205 ) -> Result<QueryEvent, ControllerError> {
206 Ok(QueryEvent::new_query(
207 QueryRoute::Logs {
208 reply_route: "".to_string(),
209 args: LogsQueryArgs {
210 s: None,
211 i: id.clone(),
212 src: Some(watcher),
213 limit: None,
214 },
215 },
216 SerializationFormats::JSON,
217 HashFunctionCode::Blake3_256,
218 ))
219 }
220}