fedimint_eventlog/
lib.rs

1#![allow(clippy::needless_lifetimes)]
2
3//! Client Event Log
4//!
5//! The goal here is to maintain a single, ordered, append only
6//! log of all important client-side events: low or high level,
7//! and move as much of coordination between different parts of
8//! the system in a natural and decomposed way.
9//!
10//! Any event log "follower" can just keep going through
11//! all events and react to ones it is interested in (and understands),
12//! potentially emitting events of its own, and atomically updating persisted
13//! event log position ("cursor") of events that were already processed.
14use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20    Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
21    NonCommittable,
22};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use serde::{Deserialize, Serialize};
29use tokio::sync::{broadcast, watch};
30use tracing::{debug, trace};
31
32/// DB prefixes hardcoded for use of the event log
33/// `fedimint-eventlog` was extracted from `fedimint-client` to help
34/// include/re-use in other part of the code. But fundamentally its role
35/// is to implement event log in the client.
36/// There is currently no way to inject the prefixes to use for db records,
37/// so we use these constants to keep them in sync. Any other app that will
38/// want to store its own even log, will need to use the exact same prefixes,
39/// which in practice should not be a problem.
40pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
41pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
42
43pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
44    const MODULE: Option<ModuleKind>;
45    const KIND: EventKind;
46    const PERSIST: bool = true;
47}
48
49/// An counter that resets on every restart, that guarantees that
50/// [`UnordedEventLogId`]s don't conflict with each other.
51static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
52
53/// A self-allocated ID that is mostly ordered
54///
55/// The goal here is to avoid concurrent database transaction
56/// conflicts due the ID allocation. Instead they are picked based on
57/// a time and a counter, so they are mostly but not strictly ordered and
58/// monotonic, and even more imporantly: not contiguous.
59#[derive(Debug, Encodable, Decodable)]
60pub struct UnordedEventLogId {
61    ts_usecs: u64,
62    counter: u64,
63}
64
65impl UnordedEventLogId {
66    fn new() -> Self {
67        Self {
68            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
69                // This will never happen
70                .unwrap_or(u64::MAX),
71            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
72        }
73    }
74}
75
76/// Ordered, contiguous ID space, which is easy for event log followers to
77/// track.
78#[derive(
79    Copy,
80    Clone,
81    Debug,
82    Encodable,
83    Decodable,
84    Default,
85    PartialEq,
86    Eq,
87    PartialOrd,
88    Ord,
89    Serialize,
90    Deserialize,
91)]
92pub struct EventLogId(u64);
93
94impl EventLogId {
95    fn next(self) -> EventLogId {
96        Self(self.0 + 1)
97    }
98
99    fn saturating_add(self, rhs: u64) -> EventLogId {
100        Self(self.0.saturating_add(rhs))
101    }
102
103    pub fn saturating_sub(self, rhs: u64) -> EventLogId {
104        Self(self.0.saturating_sub(rhs))
105    }
106
107    pub fn new(log: u64) -> Self {
108        Self(log)
109    }
110}
111
112impl FromStr for EventLogId {
113    type Err = <u64 as FromStr>::Err;
114
115    fn from_str(s: &str) -> Result<Self, Self::Err> {
116        u64::from_str(s).map(Self)
117    }
118}
119
120#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
121pub struct EventKind(Cow<'static, str>);
122
123impl EventKind {
124    pub const fn from_static(value: &'static str) -> Self {
125        Self(Cow::Borrowed(value))
126    }
127}
128
129impl<'s> From<&'s str> for EventKind {
130    fn from(value: &'s str) -> Self {
131        Self(Cow::Owned(value.to_owned()))
132    }
133}
134
135impl From<String> for EventKind {
136    fn from(value: String) -> Self {
137        Self(Cow::Owned(value))
138    }
139}
140
141#[derive(Debug, Encodable, Decodable, Clone)]
142pub struct UnorderedEventLogEntry {
143    pub persist: bool,
144    pub inner: EventLogEntry,
145}
146
147#[derive(Debug, Encodable, Decodable, Clone)]
148pub struct EventLogEntry {
149    /// Type/kind of the event
150    ///
151    /// Any part of the client is free to self-allocate identifier, denoting a
152    /// certain kind of an event. Notably one event kind have multiple
153    /// instances. E.g. "successful wallet deposit" can be an event kind,
154    /// and it can happen multiple times with different payloads.
155    pub kind: EventKind,
156
157    /// To prevent accidental conflicts between `kind`s, a module kind the
158    /// given event kind belong is used as well.
159    ///
160    /// Note: the meaning of this field is mostly about which part of the code
161    /// defines this event kind. Oftentime a core (non-module)-defined event
162    /// will refer in some way to a module. It should use a separate `module_id`
163    /// field in the `payload`, instead of this field.
164    pub module: Option<(ModuleKind, ModuleInstanceId)>,
165
166    /// Timestamp in microseconds after unix epoch
167    ts_usecs: u64,
168
169    /// Event-kind specific payload, typically encoded as a json string for
170    /// flexibility.
171    pub payload: Vec<u8>,
172}
173
174impl_db_record!(
175    key = UnordedEventLogId,
176    value = UnorderedEventLogEntry,
177    db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
178);
179
180#[derive(Clone, Debug, Encodable, Decodable)]
181pub struct UnorderedEventLogIdPrefixAll;
182
183impl_db_lookup!(
184    key = UnordedEventLogId,
185    query_prefix = UnorderedEventLogIdPrefixAll
186);
187
188#[derive(Clone, Debug, Encodable, Decodable)]
189pub struct EventLogIdPrefixAll;
190
191#[derive(Clone, Debug, Encodable, Decodable)]
192pub struct EventLogIdPrefix(EventLogId);
193
194impl_db_record!(
195    key = EventLogId,
196    value = EventLogEntry,
197    db_prefix = DB_KEY_PREFIX_EVENT_LOG,
198);
199
200impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
201
202impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
203
204#[apply(async_trait_maybe_send!)]
205pub trait DBTransactionEventLogExt {
206    async fn log_event_raw(
207        &mut self,
208        log_ordering_wakeup_tx: watch::Sender<()>,
209        kind: EventKind,
210        module_kind: Option<ModuleKind>,
211        module_id: Option<ModuleInstanceId>,
212        payload: Vec<u8>,
213        persist: bool,
214    );
215
216    /// Log an event log event
217    ///
218    /// The event will start "unordered", but after it is committed an ordering
219    /// task will be notified to "order" it into a final ordered log.
220    async fn log_event<E>(
221        &mut self,
222        log_ordering_wakeup_tx: watch::Sender<()>,
223        module_id: Option<ModuleInstanceId>,
224        event: E,
225    ) where
226        E: Event + Send,
227    {
228        self.log_event_raw(
229            log_ordering_wakeup_tx,
230            E::KIND,
231            E::MODULE,
232            module_id,
233            serde_json::to_vec(&event).expect("Serialization can't fail"),
234            <E as Event>::PERSIST,
235        )
236        .await;
237    }
238
239    /// Next [`EventLogId`] to use for new ordered events.
240    ///
241    /// Used by ordering task, though might be
242    /// useful to get the current count of events.
243    async fn get_next_event_log_id(&mut self) -> EventLogId;
244
245    /// Read a part of the event log.
246    async fn get_event_log(
247        &mut self,
248        pos: Option<EventLogId>,
249        limit: u64,
250    ) -> Vec<(
251        EventLogId,
252        EventKind,
253        Option<(ModuleKind, ModuleInstanceId)>,
254        u64,
255        serde_json::Value,
256    )>;
257}
258
259#[apply(async_trait_maybe_send!)]
260impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
261where
262    Cap: Send,
263{
264    async fn log_event_raw(
265        &mut self,
266        log_ordering_wakeup_tx: watch::Sender<()>,
267        kind: EventKind,
268        module_kind: Option<ModuleKind>,
269        module_id: Option<ModuleInstanceId>,
270        payload: Vec<u8>,
271        persist: bool,
272    ) {
273        assert_eq!(
274            module_kind.is_some(),
275            module_id.is_some(),
276            "Events of modules must have module_id set"
277        );
278
279        let unordered_id = UnordedEventLogId::new();
280        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
281
282        if self
283            .insert_entry(
284                &unordered_id,
285                &UnorderedEventLogEntry {
286                    persist,
287                    inner: EventLogEntry {
288                        kind,
289                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
290                        ts_usecs: unordered_id.ts_usecs,
291                        payload,
292                    },
293                },
294            )
295            .await
296            .is_some()
297        {
298            panic!("Trying to overwrite event in the client event log");
299        }
300        self.on_commit(move || {
301            let _ = log_ordering_wakeup_tx.send(());
302        });
303    }
304
305    async fn get_next_event_log_id(&mut self) -> EventLogId {
306        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
307            .await
308            .next()
309            .await
310            .map(|(k, _v)| k.next())
311            .unwrap_or_default()
312    }
313
314    async fn get_event_log(
315        &mut self,
316        pos: Option<EventLogId>,
317        limit: u64,
318    ) -> Vec<(
319        EventLogId,
320        EventKind,
321        Option<(ModuleKind, ModuleInstanceId)>,
322        u64,
323        serde_json::Value,
324    )> {
325        let pos = pos.unwrap_or_default();
326        self.find_by_range(pos..pos.saturating_add(limit))
327            .await
328            .map(|(k, v)| {
329                (
330                    k,
331                    v.kind,
332                    v.module,
333                    v.ts_usecs,
334                    serde_json::from_slice(&v.payload).unwrap_or_default(),
335                )
336            })
337            .collect()
338            .await
339    }
340}
341
342/// The code that handles new unordered events and rewriters them fully ordered
343/// into the final event log.
344pub async fn run_event_log_ordering_task(
345    db: Database,
346    mut log_ordering_task_wakeup: watch::Receiver<()>,
347    log_event_added: watch::Sender<()>,
348    log_event_added_transient: broadcast::Sender<EventLogEntry>,
349) {
350    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
351    let mut next_entry_id = db
352        .begin_transaction_nc()
353        .await
354        .get_next_event_log_id()
355        .await;
356
357    loop {
358        let mut dbtx = db.begin_transaction().await;
359
360        let unordered_events = dbtx
361            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
362            .await
363            .collect::<Vec<_>>()
364            .await;
365        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
366
367        for (unordered_id, entry) in &unordered_events {
368            assert!(
369                dbtx.remove_entry(unordered_id).await.is_some(),
370                "Must never fail to remove entry"
371            );
372            if entry.persist {
373                assert!(
374                    dbtx.insert_entry(&next_entry_id, &entry.inner)
375                        .await
376                        .is_none(),
377                    "Must never overwrite existing event"
378                );
379                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
380                next_entry_id = next_entry_id.next();
381            } else {
382                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
383                dbtx.on_commit({
384                    let log_event_added_transient = log_event_added_transient.clone();
385                    let entry = entry.inner.clone();
386
387                    move || {
388                        // we ignore the no-subscribers
389                        let _ = log_event_added_transient.send(entry);
390                    }
391                });
392            }
393        }
394
395        // This thread is the only thread deleting already existing element of unordered
396        // log and inserting new elements into ordered log, so it should never
397        // fail to commit.
398        dbtx.commit_tx().await;
399        if !unordered_events.is_empty() {
400            let _ = log_event_added.send(());
401        }
402
403        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
404        if log_ordering_task_wakeup.changed().await.is_err() {
405            break;
406        }
407    }
408
409    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
410}
411
412pub async fn handle_events<F, R, K>(
413    db: Database,
414    pos_key: &K,
415    mut log_event_added: watch::Receiver<()>,
416    call_fn: F,
417) -> anyhow::Result<()>
418where
419    K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
420    K: DatabaseRecord<Value = EventLogId>,
421    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
422    R: Future<Output = anyhow::Result<()>>,
423{
424    let mut next_key: EventLogId = db
425        .begin_transaction_nc()
426        .await
427        .get_value(pos_key)
428        .await
429        .unwrap_or_default();
430
431    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
432
433    loop {
434        let mut dbtx = db.begin_transaction().await;
435
436        if let Some(event) = dbtx.get_value(&next_key).await {
437            (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
438
439            next_key = next_key.next();
440            dbtx.insert_entry(pos_key, &next_key).await;
441
442            dbtx.commit_tx().await;
443        } else if log_event_added.changed().await.is_err() {
444            break Ok(());
445        }
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use std::sync::atomic::AtomicU8;
452    use std::sync::Arc;
453
454    use anyhow::bail;
455    use fedimint_core::db::mem_impl::MemDatabase;
456    use fedimint_core::db::IRawDatabaseExt as _;
457    use fedimint_core::encoding::{Decodable, Encodable};
458    use fedimint_core::impl_db_record;
459    use fedimint_core::task::TaskGroup;
460    use tokio::sync::{broadcast, watch};
461    use tokio::try_join;
462    use tracing::info;
463
464    use super::{
465        handle_events, run_event_log_ordering_task, DBTransactionEventLogExt as _, EventLogId,
466    };
467    use crate::EventKind;
468
469    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
470    pub struct TestLogIdKey;
471
472    impl_db_record!(key = TestLogIdKey, value = EventLogId, db_prefix = 0x00,);
473
474    #[test_log::test(tokio::test)]
475    async fn sanity_handle_events() {
476        let db = MemDatabase::new().into_database();
477        let tg = TaskGroup::new();
478
479        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
480        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
481        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
482            broadcast::channel(1024);
483
484        tg.spawn_cancellable(
485            "event log ordering task",
486            run_event_log_ordering_task(
487                db.clone(),
488                log_ordering_wakeup_rx,
489                log_event_added_tx,
490                log_event_added_transient_tx,
491            ),
492        );
493
494        let counter = Arc::new(AtomicU8::new(0));
495
496        let _ = try_join!(
497            handle_events(
498                db.clone(),
499                &TestLogIdKey,
500                log_event_added_rx,
501                move |_dbtx, event| {
502                    let counter = counter.clone();
503                    Box::pin(async move {
504                        info!("{event:?}");
505
506                        assert_eq!(
507                            event.kind,
508                            EventKind::from(format!(
509                                "{}",
510                                counter.load(std::sync::atomic::Ordering::Relaxed)
511                            ))
512                        );
513
514                        if counter.load(std::sync::atomic::Ordering::Relaxed) == 4 {
515                            bail!("Time to wrap up");
516                        }
517                        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
518                        Ok(())
519                    })
520                },
521            ),
522            async {
523                for i in 0..=4 {
524                    let mut dbtx = db.begin_transaction().await;
525                    dbtx.log_event_raw(
526                        log_ordering_wakeup_tx.clone(),
527                        EventKind::from(format!("{i}")),
528                        None,
529                        None,
530                        vec![],
531                        true,
532                    )
533                    .await;
534
535                    dbtx.commit_tx().await;
536                }
537
538                Ok(())
539            }
540        );
541    }
542}