keri_controller/
known_events.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use keri_core::actor::parse_event_stream;
5use keri_core::database::redb::{RedbDatabase, RedbError};
6use keri_core::error::Error;
7use keri_core::event_message::signed_event_message::SignedNontransferableReceipt;
8use keri_core::oobi::LocationScheme;
9use keri_core::prefix::{BasicPrefix, IdentifierPrefix, IndexedSignature, SelfSigningPrefix};
10
11use keri_core::processor::escrow::partially_witnessed_escrow::PartiallyWitnessedEscrow;
12use keri_core::processor::escrow::EscrowConfig;
13use keri_core::processor::notification::JustNotification;
14
15use keri_core::processor::Processor;
16use keri_core::state::IdentifierState;
17use keri_core::{
18    actor::{self, event_generator, prelude::SelfAddressingIdentifier},
19    event::{event_data::EventData, sections::seal::Seal, KeyEvent},
20    event_message::{
21        cesr_adapter::{parse_event_type, EventType},
22        msg::KeriEvent,
23        signed_event_message::{Message, Notice, Op},
24    },
25    oobi::{Role, Scheme},
26    oobi_manager::OobiManager,
27    processor::{
28        basic_processor::BasicProcessor, escrow::default_escrow_bus, event_storage::EventStorage,
29    },
30    query::reply_event::{ReplyEvent, ReplyRoute, SignedReply},
31};
32use teliox::database::redb::RedbTelDatabase;
33use teliox::database::{EscrowDatabase, TelEventDatabase};
34use teliox::processor::escrow::default_escrow_bus as tel_escrow_bus;
35use teliox::processor::storage::TelEventStorage;
36use teliox::tel::Tel;
37
38use crate::error::ControllerError;
39use crate::identifier::mechanics::MechanicsError;
40
41#[derive(Debug, thiserror::Error)]
42pub enum OobiRetrieveError {
43    #[error("No oobi for {0} identifier")]
44    MissingOobi(IdentifierPrefix, Option<Scheme>),
45    #[error(transparent)]
46    DbError(#[from] RedbError),
47}
48
49pub struct KnownEvents {
50    processor: BasicProcessor<RedbDatabase>,
51    pub storage: Arc<EventStorage<RedbDatabase>>,
52    pub oobi_manager: OobiManager,
53    pub partially_witnessed_escrow: Arc<PartiallyWitnessedEscrow<RedbDatabase>>,
54    pub tel: Arc<Tel<RedbTelDatabase, RedbDatabase>>,
55}
56
57impl KnownEvents {
58    pub fn new(db_path: PathBuf, escrow_config: EscrowConfig) -> Result<Self, ControllerError> {
59        let event_database = {
60            let mut path = db_path.clone();
61            path.push("events_database");
62            Arc::new(RedbDatabase::new(&path)?)
63        };
64
65        let oobi_manager = OobiManager::new(event_database.clone());
66
67        let (
68            mut notification_bus,
69            (
70                _out_of_order_escrow,
71                _partially_signed_escrow,
72                partially_witnessed_escrow,
73                _delegation_escrow,
74                _duplicates,
75            ),
76        ) = default_escrow_bus(event_database.clone(), escrow_config);
77
78        let kel_storage = Arc::new(EventStorage::new(event_database.clone()));
79
80        // Initiate tel and it's escrows
81        let tel_events_db = {
82            let mut path = db_path.clone();
83            path.push("tel");
84            path.push("events");
85            Arc::new(RedbTelDatabase::new(&path)?)
86        };
87
88        let tel_escrow_db = {
89            let mut path = db_path.clone();
90            path.push("tel");
91            path.push("escrow");
92            EscrowDatabase::new(&path).map_err(|e| ControllerError::OtherError(e.to_string()))?
93        };
94        let (tel_bus, missing_issuer, _out_of_order, _missing_registy) =
95            tel_escrow_bus(tel_events_db.clone(), kel_storage.clone(), tel_escrow_db)?;
96
97        let tel_storage = Arc::new(TelEventStorage::new(tel_events_db.clone()));
98        let tel = Arc::new(Tel::new(tel_storage, kel_storage.clone(), Some(tel_bus)));
99
100        notification_bus.register_observer(
101            missing_issuer.clone(),
102            vec![JustNotification::KeyEventAdded],
103        );
104
105        let controller = Self {
106            processor: BasicProcessor::new(event_database.clone(), Some(notification_bus)),
107            storage: kel_storage,
108            oobi_manager,
109            partially_witnessed_escrow,
110            tel,
111        };
112
113        Ok(controller)
114    }
115
116    pub fn save(&self, message: &Message) -> Result<(), MechanicsError> {
117        self.process(message)?;
118        Ok(())
119    }
120
121    pub fn save_oobi(&self, oobi: &SignedReply) -> Result<(), MechanicsError> {
122        Ok(self.oobi_manager.process_oobi(oobi)?)
123    }
124
125    pub fn current_public_keys(
126        &self,
127        id: &IdentifierPrefix,
128    ) -> Result<Vec<BasicPrefix>, MechanicsError> {
129        Ok(self
130            .storage
131            .get_state(id)
132            .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?
133            .current
134            .public_keys)
135    }
136
137    pub fn next_keys_hashes(
138        &self,
139        id: &IdentifierPrefix,
140    ) -> Result<Vec<SelfAddressingIdentifier>, MechanicsError> {
141        Ok(self
142            .storage
143            .get_state(id)
144            .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?
145            .current
146            .next_keys_data
147            .next_keys_hashes())
148    }
149
150    pub fn get_watchers(
151        &self,
152        id: &IdentifierPrefix,
153    ) -> Result<Vec<IdentifierPrefix>, ControllerError> {
154        Ok(self
155            .oobi_manager
156            .get_end_role(id, Role::Watcher)?
157            .unwrap_or_default()
158            .into_iter()
159            .filter_map(|r| {
160                if let ReplyRoute::EndRoleAdd(adds) = r.reply.get_route() {
161                    Some(adds.eid)
162                } else {
163                    None
164                }
165            })
166            .collect::<Vec<_>>())
167    }
168
169    // Returns messages if they can be returned immediately, i.e. for query message
170    pub fn process(&self, msg: &Message) -> Result<Option<Vec<Message>>, Error> {
171        let response = match msg.clone() {
172            Message::Op(op) => match op {
173                Op::Reply(rpy) => {
174                    actor::process_reply(rpy, &self.oobi_manager, &self.processor, &self.storage)?;
175                    None
176                }
177                Op::Query(_) => {
178                    // TODO: Should controller respond to queries?
179                    None
180                }
181                Op::Exchange(_) => todo!(),
182            },
183            Message::Notice(notice) => {
184                self.processor.process_notice(&notice)?;
185                None
186            }
187        };
188
189        Ok(response)
190    }
191
192    /// Parse and process events stream
193    pub fn process_stream(&self, stream: &[u8]) -> Result<(), ControllerError> {
194        let messages = parse_event_stream(stream)?;
195        for message in messages {
196            self.process(&message)?;
197        }
198        Ok(())
199    }
200
201    /// Returns identifier contact information.
202    pub fn get_loc_schemas(
203        &self,
204        id: &IdentifierPrefix,
205    ) -> Result<Vec<LocationScheme>, OobiRetrieveError> {
206        let location_schemas: Vec<_> = self
207            .oobi_manager
208            .get_loc_scheme(id)?
209            .iter()
210            .filter_map(|lc| {
211                if let ReplyRoute::LocScheme(loc_scheme) = lc.get_route() {
212                    Some(loc_scheme)
213                } else {
214                    None
215                }
216            })
217            .collect();
218        if location_schemas.is_empty() {
219            Err(OobiRetrieveError::MissingOobi(id.clone(), None))
220        } else {
221            Ok(location_schemas)
222        }
223    }
224
225    pub fn find_location(
226        &self,
227        id: &IdentifierPrefix,
228        scheme: Scheme,
229    ) -> Result<LocationScheme, OobiRetrieveError> {
230        self.get_loc_schemas(id)?
231            .into_iter()
232            .find(|loc| loc.scheme == scheme)
233            .ok_or(OobiRetrieveError::MissingOobi(id.clone(), Some(scheme)))
234    }
235
236    pub fn find_receipt(
237        &self,
238        id: &IdentifierPrefix,
239        sn: u64,
240        digest: &SelfAddressingIdentifier,
241    ) -> Result<Option<SignedNontransferableReceipt>, Error> {
242        let rcts_from_db = self.storage.get_nt_receipts(id, sn)?;
243        match &rcts_from_db {
244            Some(rct) => {
245                if rct.body.receipted_event_digest.eq(digest) {
246                    Ok(rcts_from_db)
247                } else {
248                    Ok(None)
249                }
250            }
251            None => Ok(None),
252        }
253    }
254
255    pub fn find_kel_with_receipts(&self, id: &IdentifierPrefix) -> Option<Vec<Notice>> {
256        self.storage.get_kel_messages_with_receipts_all(id).unwrap()
257    }
258
259    pub fn find_kel(&self, id: &IdentifierPrefix) -> Option<String> {
260        self.storage
261            .get_kel(id)
262            .unwrap()
263            .map(|kel| String::from_utf8(kel).unwrap())
264    }
265
266    pub fn incept(
267        &self,
268        public_keys: Vec<BasicPrefix>,
269        next_pub_keys: Vec<BasicPrefix>,
270        witnesses: Vec<LocationScheme>,
271        witness_threshold: u64,
272    ) -> Result<String, MechanicsError> {
273        let witnesses = witnesses
274            .iter()
275            .map(|wit| {
276                if let IdentifierPrefix::Basic(bp) = &wit.eid {
277                    Ok(bp.clone())
278                } else {
279                    Err(MechanicsError::WrongWitnessPrefixError)
280                }
281            })
282            .collect::<Result<Vec<_>, _>>()?;
283        event_generator::incept(
284            public_keys,
285            next_pub_keys,
286            witnesses,
287            witness_threshold,
288            None,
289        )
290        .map_err(|e| MechanicsError::EventGenerationError(e.to_string()))
291    }
292
293    /// Verifies event signature and adds it to kel.
294    /// Returns new established identifier prefix.
295    /// Meant to be used for identifiers with one key pair.
296    /// Must call `IdentifierController::notify_witnesses` after calling this function.
297    pub fn finalize_inception(
298        &self,
299        event: &[u8],
300        sig: &SelfSigningPrefix,
301    ) -> Result<IdentifierPrefix, MechanicsError> {
302        let parsed_event =
303            parse_event_type(event).map_err(|_e| MechanicsError::EventFormatError)?;
304        match parsed_event {
305            EventType::KeyEvent(ke) => {
306                if let EventData::Icp(_) = &ke.data.get_event_data() {
307                    // TODO we assume here that provided signature matches 0th public key.
308                    self.finalize_key_event(&ke, sig, 0)?;
309                    Ok(ke.data.get_prefix())
310                } else {
311                    Err(MechanicsError::InceptionError(
312                        "Wrong event type, should be inception event".into(),
313                    ))
314                }
315            }
316            _ => Err(MechanicsError::InceptionError(
317                "Wrong event type, should be inception event".into(),
318            )),
319        }
320    }
321
322    /// Generate and return rotation event for given identifier data
323    // pub fn rotate(
324    //     &self,
325    //     id: IdentifierPrefix,
326    //     current_keys: Vec<BasicPrefix>,
327    //     new_next_keys: Vec<BasicPrefix>,
328    //     new_next_threshold: u64,
329    //     witness_to_add: Vec<LocationScheme>,
330    //     witness_to_remove: Vec<BasicPrefix>,
331    //     witness_threshold: u64,
332    // ) -> Result<String, ControllerError> {
333    //     let witnesses_to_add = witness_to_add
334    //         .iter()
335    //         .map(|wit| {
336    //             if let IdentifierPrefix::Basic(bp) = &wit.eid {
337    //                 Ok(bp.clone())
338    //             } else {
339    //                 Err(ControllerError::WrongWitnessPrefixError)
340    //             }
341    //         })
342    //         .collect::<Result<Vec<_>, _>>()?;
343
344    //     let state = self
345    //         .storage
346    //         .get_state(&id)
347    //         .ok_or(ControllerError::UnknownIdentifierError)?;
348
349    //     event_generator::rotate(
350    //         state,
351    //         current_keys,
352    //         new_next_keys,
353    //         new_next_threshold,
354    //         witnesses_to_add,
355    //         witness_to_remove,
356    //         witness_threshold,
357    //     )
358    //     .map_err(|e| ControllerError::EventGenerationError(e.to_string()))
359    // }
360
361    /// Generate and return interaction event for given identifier data
362    // pub fn anchor(
363    //     &self,
364    //     id: IdentifierPrefix,
365    //     payload: &[SelfAddressingIdentifier],
366    // ) -> Result<String, ControllerError> {
367    //     let state = self
368    //         .storage
369    //         .get_state(&id)
370    //         .ok_or(ControllerError::UnknownIdentifierError)?;
371    //     event_generator::anchor(state, payload)
372    //         .map_err(|e| ControllerError::EventGenerationError(e.to_string()))
373    // }
374
375    /// Generate and return interaction event for given identifier data
376    pub fn anchor_with_seal(
377        &self,
378        id: &IdentifierPrefix,
379        payload: &[Seal],
380    ) -> Result<KeriEvent<KeyEvent>, MechanicsError> {
381        let state = self
382            .storage
383            .get_state(id)
384            .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?;
385        event_generator::anchor_with_seal(state, payload)
386            .map_err(|e| MechanicsError::EventGenerationError(e.to_string()))
387    }
388
389    pub fn get_current_witness_list(
390        &self,
391        id: &IdentifierPrefix,
392    ) -> Result<Vec<BasicPrefix>, MechanicsError> {
393        Ok(self
394            .storage
395            .get_state(id)
396            .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))?
397            .witness_config
398            .witnesses)
399    }
400
401    /// Adds signature to event and processes it.
402    /// Should call `IdentifierController::notify_witnesses` after calling this function.
403    pub fn finalize_key_event(
404        &self,
405        event: &KeriEvent<KeyEvent>,
406        sig: &SelfSigningPrefix,
407        own_index: usize,
408    ) -> Result<(), MechanicsError> {
409        let signature = IndexedSignature::new_both_same(sig.clone(), own_index as u16);
410
411        let signed_message = event.sign(vec![signature], None, None);
412        // self.processor.process_own_event(signed_message)?;
413        self.process(&Message::Notice(Notice::Event(signed_message)))?;
414
415        Ok(())
416    }
417
418    pub fn get_state_at_event(
419        &self,
420        event_message: &KeriEvent<KeyEvent>,
421    ) -> Result<IdentifierState, MechanicsError> {
422        let identifier = event_message.data.get_prefix();
423        Ok(match event_message.data.get_event_data() {
424            EventData::Icp(_icp) => IdentifierState::default().apply(event_message)?,
425            EventData::Rot(_rot) => self
426                .storage
427                .get_state(&identifier)
428                .ok_or(MechanicsError::UnknownIdentifierError(identifier))?
429                .apply(event_message)?,
430            EventData::Ixn(_ixn) => self
431                .storage
432                .get_state(&identifier)
433                .ok_or(MechanicsError::UnknownIdentifierError(identifier))?,
434            EventData::Dip(_dip) => IdentifierState::default().apply(event_message)?,
435            EventData::Drt(_drt) => self
436                .storage
437                .get_state(&identifier)
438                .ok_or(MechanicsError::UnknownIdentifierError(identifier))?
439                .apply(event_message)?,
440        })
441    }
442
443    pub fn find_witnesses_at_event(
444        &self,
445        event_message: &KeriEvent<KeyEvent>,
446    ) -> Result<Vec<BasicPrefix>, MechanicsError> {
447        let state = self.get_state_at_event(event_message)?;
448        Ok(state.witness_config.witnesses)
449    }
450
451    pub fn finalize_add_role(
452        &self,
453        signer_prefix: &IdentifierPrefix,
454        event: ReplyEvent,
455        sig: Vec<SelfSigningPrefix>,
456    ) -> Result<(IdentifierPrefix, Vec<Message>), MechanicsError> {
457        let mut messages_to_send = vec![];
458        let (dest_prefix, role) = match &event.data.data {
459            ReplyRoute::EndRoleAdd(role) => (role.eid.clone(), role.role.clone()),
460            ReplyRoute::EndRoleCut(role) => (role.eid.clone(), role.role.clone()),
461            _ => return Err(MechanicsError::EventFormatError),
462        };
463        let signed_reply = match signer_prefix {
464            IdentifierPrefix::Basic(bp) => Message::Op(Op::Reply(SignedReply::new_nontrans(
465                event,
466                bp.clone(),
467                sig[0].clone(),
468            ))),
469            _ => {
470                let sigs = sig
471                    .into_iter()
472                    .enumerate()
473                    .map(|(i, sig)| IndexedSignature::new_both_same(sig, i as u16))
474                    .collect();
475
476                let signed_rpy = Message::Op(Op::Reply(SignedReply::new_trans(
477                    event,
478                    self.storage
479                        .get_last_establishment_event_seal(signer_prefix)
480                        .ok_or(MechanicsError::UnknownIdentifierError(
481                            signer_prefix.clone(),
482                        ))?,
483                    sigs,
484                )));
485                if Role::Messagebox != role {
486                    let kel = self
487                        .storage
488                        .get_kel_messages_with_receipts_all(signer_prefix)?
489                        .ok_or(MechanicsError::UnknownIdentifierError(
490                            signer_prefix.clone(),
491                        ))?;
492
493                    for ev in kel {
494                        messages_to_send.push(Message::Notice(ev));
495                    }
496                };
497                signed_rpy
498            }
499        };
500
501        self.process(&signed_reply)?;
502
503        messages_to_send.push(signed_reply.clone());
504
505        Ok((dest_prefix, messages_to_send))
506    }
507
508    pub fn get_state(&self, id: &IdentifierPrefix) -> Result<IdentifierState, MechanicsError> {
509        self.storage
510            .get_state(id)
511            .ok_or(MechanicsError::UnknownIdentifierError(id.clone()))
512    }
513}