1use std::path::PathBuf;
2use std::sync::Arc;
3
4use keri_core::actor::parse_event_stream;
5use keri_core::database::redb::{RedbDatabase, RedbError};
6use keri_core::error::Error;
7use keri_core::event_message::signed_event_message::SignedNontransferableReceipt;
8use keri_core::oobi::LocationScheme;
9use keri_core::prefix::{BasicPrefix, IdentifierPrefix, IndexedSignature, SelfSigningPrefix};
10
11use keri_core::processor::escrow::partially_witnessed_escrow::PartiallyWitnessedEscrow;
12use keri_core::processor::escrow::EscrowConfig;
13use keri_core::processor::notification::JustNotification;
14
15use keri_core::processor::Processor;
16use keri_core::state::IdentifierState;
17use keri_core::{
18 actor::{self, event_generator, prelude::SelfAddressingIdentifier},
19 event::{event_data::EventData, sections::seal::Seal, KeyEvent},
20 event_message::{
21 cesr_adapter::{parse_event_type, EventType},
22 msg::KeriEvent,
23 signed_event_message::{Message, Notice, Op},
24 },
25 oobi::{Role, Scheme},
26 oobi_manager::OobiManager,
27 processor::{
28 basic_processor::BasicProcessor, escrow::default_escrow_bus, event_storage::EventStorage,
29 },
30 query::reply_event::{ReplyEvent, ReplyRoute, SignedReply},
31};
32use teliox::database::redb::RedbTelDatabase;
33use teliox::database::{EscrowDatabase, TelEventDatabase};
34use teliox::processor::escrow::default_escrow_bus as tel_escrow_bus;
35use teliox::processor::storage::TelEventStorage;
36use teliox::tel::Tel;
37
38use crate::error::ControllerError;
39use crate::identifier::mechanics::MechanicsError;
40
41#[derive(Debug, thiserror::Error)]
42pub enum OobiRetrieveError {
43 #[error("No oobi for {0} identifier")]
44 MissingOobi(IdentifierPrefix, Option<Scheme>),
45 #[error(transparent)]
46 DbError(#[from] RedbError),
47}
48
49pub struct KnownEvents {
50 processor: BasicProcessor<RedbDatabase>,
51 pub storage: Arc<EventStorage<RedbDatabase>>,
52 pub oobi_manager: OobiManager,
53 pub partially_witnessed_escrow: Arc<PartiallyWitnessedEscrow<RedbDatabase>>,
54 pub tel: Arc<Tel<RedbTelDatabase, RedbDatabase>>,
55}
56
57impl KnownEvents {
58 pub fn new(db_path: PathBuf, escrow_config: EscrowConfig) -> Result<Self, ControllerError> {
59 let event_database = {
60 let mut path = db_path.clone();
61 path.push("events_database");
62 Arc::new(RedbDatabase::new(&path)?)
63 };
64
65 let oobi_manager = OobiManager::new(event_database.clone());
66
67 let (
68 mut notification_bus,
69 (
70 _out_of_order_escrow,
71 _partially_signed_escrow,
72 partially_witnessed_escrow,
73 _delegation_escrow,
74 _duplicates,
75 ),
76 ) = default_escrow_bus(event_database.clone(), escrow_config);
77
78 let kel_storage = Arc::new(EventStorage::new(event_database.clone()));
79
80 let tel_events_db = {
82 let mut path = db_path.clone();
83 path.push("tel");
84 path.push("events");
85 Arc::new(RedbTelDatabase::new(&path)?)
86 };
87
88 let tel_escrow_db = {
89 let mut path = db_path.clone();
90 path.push("tel");
91 path.push("escrow");
92 EscrowDatabase::new(&path).map_err(|e| ControllerError::OtherError(e.to_string()))?
93 };
94 let (tel_bus, missing_issuer, _out_of_order, _missing_registy) =
95 tel_escrow_bus(tel_events_db.clone(), kel_storage.clone(), tel_escrow_db)?;
96
97 let tel_storage = Arc::new(TelEventStorage::new(tel_events_db.clone()));
98 let tel = Arc::new(Tel::new(tel_storage, kel_storage.clone(), Some(tel_bus)));
99
100 notification_bus.register_observer(
101 missing_issuer.clone(),
102 vec![JustNotification::KeyEventAdded],
103 );
104
105 let controller = Self {
106 processor: BasicProcessor::new(event_database.clone(), Some(notification_bus)),
107 storage: kel_storage,
108 oobi_manager,
109 partially_witnessed_escrow,
110 tel,
111 };
112
113 Ok(controller)
114 }
115
116 pub fn save(&self, message: &Message) -> Result<(), MechanicsError> {
117 self.process(message)?;
118 Ok(())
119 }
120
121 pub fn save_oobi(&self, oobi: &SignedReply) -> Result<(), MechanicsError> {
122 Ok(self.oobi_manager.process_oobi(oobi)?)
123 }
124
125 pub fn current_public_keys(
126 &self,
127 id: &IdentifierPrefix,
128 ) -> Result<Vec<BasicPrefix>, MechanicsError> {
129 Ok(self
130 .storage
131 .get_state(id)
132 .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?
133 .current
134 .public_keys)
135 }
136
137 pub fn next_keys_hashes(
138 &self,
139 id: &IdentifierPrefix,
140 ) -> Result<Vec<SelfAddressingIdentifier>, MechanicsError> {
141 Ok(self
142 .storage
143 .get_state(id)
144 .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?
145 .current
146 .next_keys_data
147 .next_keys_hashes())
148 }
149
150 pub fn get_watchers(
151 &self,
152 id: &IdentifierPrefix,
153 ) -> Result<Vec<IdentifierPrefix>, ControllerError> {
154 Ok(self
155 .oobi_manager
156 .get_end_role(id, Role::Watcher)?
157 .unwrap_or_default()
158 .into_iter()
159 .filter_map(|r| {
160 if let ReplyRoute::EndRoleAdd(adds) = r.reply.get_route() {
161 Some(adds.eid)
162 } else {
163 None
164 }
165 })
166 .collect::<Vec<_>>())
167 }
168
169 pub fn process(&self, msg: &Message) -> Result<Option<Vec<Message>>, Error> {
171 let response = match msg.clone() {
172 Message::Op(op) => match op {
173 Op::Reply(rpy) => {
174 actor::process_reply(rpy, &self.oobi_manager, &self.processor, &self.storage)?;
175 None
176 }
177 Op::Query(_) => {
178 None
180 }
181 Op::Exchange(_) => todo!(),
182 },
183 Message::Notice(notice) => {
184 self.processor.process_notice(¬ice)?;
185 None
186 }
187 };
188
189 Ok(response)
190 }
191
192 pub fn process_stream(&self, stream: &[u8]) -> Result<(), ControllerError> {
194 let messages = parse_event_stream(stream)?;
195 for message in messages {
196 self.process(&message)?;
197 }
198 Ok(())
199 }
200
201 pub fn get_loc_schemas(
203 &self,
204 id: &IdentifierPrefix,
205 ) -> Result<Vec<LocationScheme>, OobiRetrieveError> {
206 let location_schemas: Vec<_> = self
207 .oobi_manager
208 .get_loc_scheme(id)?
209 .iter()
210 .filter_map(|lc| {
211 if let ReplyRoute::LocScheme(loc_scheme) = lc.get_route() {
212 Some(loc_scheme)
213 } else {
214 None
215 }
216 })
217 .collect();
218 if location_schemas.is_empty() {
219 Err(OobiRetrieveError::MissingOobi(id.clone(), None))
220 } else {
221 Ok(location_schemas)
222 }
223 }
224
225 pub fn find_location(
226 &self,
227 id: &IdentifierPrefix,
228 scheme: Scheme,
229 ) -> Result<LocationScheme, OobiRetrieveError> {
230 self.get_loc_schemas(id)?
231 .into_iter()
232 .find(|loc| loc.scheme == scheme)
233 .ok_or(OobiRetrieveError::MissingOobi(id.clone(), Some(scheme)))
234 }
235
236 pub fn find_receipt(
237 &self,
238 id: &IdentifierPrefix,
239 sn: u64,
240 digest: &SelfAddressingIdentifier,
241 ) -> Result<Option<SignedNontransferableReceipt>, Error> {
242 let rcts_from_db = self.storage.get_nt_receipts(id, sn)?;
243 match &rcts_from_db {
244 Some(rct) => {
245 if rct.body.receipted_event_digest.eq(digest) {
246 Ok(rcts_from_db)
247 } else {
248 Ok(None)
249 }
250 }
251 None => Ok(None),
252 }
253 }
254
255 pub fn find_kel_with_receipts(&self, id: &IdentifierPrefix) -> Option<Vec<Notice>> {
256 self.storage.get_kel_messages_with_receipts_all(id).unwrap()
257 }
258
259 pub fn find_kel(&self, id: &IdentifierPrefix) -> Option<String> {
260 self.storage
261 .get_kel(id)
262 .unwrap()
263 .map(|kel| String::from_utf8(kel).unwrap())
264 }
265
266 pub fn incept(
267 &self,
268 public_keys: Vec<BasicPrefix>,
269 next_pub_keys: Vec<BasicPrefix>,
270 witnesses: Vec<LocationScheme>,
271 witness_threshold: u64,
272 ) -> Result<String, MechanicsError> {
273 let witnesses = witnesses
274 .iter()
275 .map(|wit| {
276 if let IdentifierPrefix::Basic(bp) = &wit.eid {
277 Ok(bp.clone())
278 } else {
279 Err(MechanicsError::WrongWitnessPrefixError)
280 }
281 })
282 .collect::<Result<Vec<_>, _>>()?;
283 event_generator::incept(
284 public_keys,
285 next_pub_keys,
286 witnesses,
287 witness_threshold,
288 None,
289 )
290 .map_err(|e| MechanicsError::EventGenerationError(e.to_string()))
291 }
292
293 pub fn finalize_inception(
298 &self,
299 event: &[u8],
300 sig: &SelfSigningPrefix,
301 ) -> Result<IdentifierPrefix, MechanicsError> {
302 let parsed_event =
303 parse_event_type(event).map_err(|_e| MechanicsError::EventFormatError)?;
304 match parsed_event {
305 EventType::KeyEvent(ke) => {
306 if let EventData::Icp(_) = &ke.data.get_event_data() {
307 self.finalize_key_event(&ke, sig, 0)?;
309 Ok(ke.data.get_prefix())
310 } else {
311 Err(MechanicsError::InceptionError(
312 "Wrong event type, should be inception event".into(),
313 ))
314 }
315 }
316 _ => Err(MechanicsError::InceptionError(
317 "Wrong event type, should be inception event".into(),
318 )),
319 }
320 }
321
322 pub fn anchor_with_seal(
377 &self,
378 id: &IdentifierPrefix,
379 payload: &[Seal],
380 ) -> Result<KeriEvent<KeyEvent>, MechanicsError> {
381 let state = self
382 .storage
383 .get_state(id)
384 .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?;
385 event_generator::anchor_with_seal(state, payload)
386 .map_err(|e| MechanicsError::EventGenerationError(e.to_string()))
387 }
388
389 pub fn get_current_witness_list(
390 &self,
391 id: &IdentifierPrefix,
392 ) -> Result<Vec<BasicPrefix>, MechanicsError> {
393 Ok(self
394 .storage
395 .get_state(id)
396 .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?
397 .witness_config
398 .witnesses)
399 }
400
401 pub fn finalize_key_event(
404 &self,
405 event: &KeriEvent<KeyEvent>,
406 sig: &SelfSigningPrefix,
407 own_index: usize,
408 ) -> Result<(), MechanicsError> {
409 let signature = IndexedSignature::new_both_same(sig.clone(), own_index as u16);
410
411 let signed_message = event.sign(vec![signature], None, None);
412 self.process(&Message::Notice(Notice::Event(signed_message)))?;
414
415 Ok(())
416 }
417
418 pub fn get_state_at_event(
419 &self,
420 event_message: &KeriEvent<KeyEvent>,
421 ) -> Result<IdentifierState, MechanicsError> {
422 let identifier = event_message.data.get_prefix();
423 Ok(match event_message.data.get_event_data() {
424 EventData::Icp(_icp) => IdentifierState::default().apply(event_message)?,
425 EventData::Rot(_rot) => self
426 .storage
427 .get_state(&identifier)
428 .ok_or(MechanicsError::UnknownIdentifierError(identifier))?
429 .apply(event_message)?,
430 EventData::Ixn(_ixn) => self
431 .storage
432 .get_state(&identifier)
433 .ok_or(MechanicsError::UnknownIdentifierError(identifier))?,
434 EventData::Dip(_dip) => IdentifierState::default().apply(event_message)?,
435 EventData::Drt(_drt) => self
436 .storage
437 .get_state(&identifier)
438 .ok_or(MechanicsError::UnknownIdentifierError(identifier))?
439 .apply(event_message)?,
440 })
441 }
442
443 pub fn find_witnesses_at_event(
444 &self,
445 event_message: &KeriEvent<KeyEvent>,
446 ) -> Result<Vec<BasicPrefix>, MechanicsError> {
447 let state = self.get_state_at_event(event_message)?;
448 Ok(state.witness_config.witnesses)
449 }
450
451 pub fn finalize_add_role(
452 &self,
453 signer_prefix: &IdentifierPrefix,
454 event: ReplyEvent,
455 sig: Vec<SelfSigningPrefix>,
456 ) -> Result<(IdentifierPrefix, Vec<Message>), MechanicsError> {
457 let mut messages_to_send = vec![];
458 let (dest_prefix, role) = match &event.data.data {
459 ReplyRoute::EndRoleAdd(role) => (role.eid.clone(), role.role.clone()),
460 ReplyRoute::EndRoleCut(role) => (role.eid.clone(), role.role.clone()),
461 _ => return Err(MechanicsError::EventFormatError),
462 };
463 let signed_reply = match signer_prefix {
464 IdentifierPrefix::Basic(bp) => Message::Op(Op::Reply(SignedReply::new_nontrans(
465 event,
466 bp.clone(),
467 sig[0].clone(),
468 ))),
469 _ => {
470 let sigs = sig
471 .into_iter()
472 .enumerate()
473 .map(|(i, sig)| IndexedSignature::new_both_same(sig, i as u16))
474 .collect();
475
476 let signed_rpy = Message::Op(Op::Reply(SignedReply::new_trans(
477 event,
478 self.storage
479 .get_last_establishment_event_seal(signer_prefix)
480 .ok_or(MechanicsError::UnknownIdentifierError(
481 signer_prefix.clone(),
482 ))?,
483 sigs,
484 )));
485 if Role::Messagebox != role {
486 let kel = self
487 .storage
488 .get_kel_messages_with_receipts_all(signer_prefix)?
489 .ok_or(MechanicsError::UnknownIdentifierError(
490 signer_prefix.clone(),
491 ))?;
492
493 for ev in kel {
494 messages_to_send.push(Message::Notice(ev));
495 }
496 };
497 signed_rpy
498 }
499 };
500
501 self.process(&signed_reply)?;
502
503 messages_to_send.push(signed_reply.clone());
504
505 Ok((dest_prefix, messages_to_send))
506 }
507
508 pub fn get_state(&self, id: &IdentifierPrefix) -> Result<IdentifierState, MechanicsError> {
509 self.storage
510 .get_state(id)
511 .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))
512 }
513}