1#![allow(clippy::needless_lifetimes)]
2
3use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20 Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
21 NonCommittable,
22};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use serde::{Deserialize, Serialize};
29use tokio::sync::{broadcast, watch};
30use tracing::{debug, trace};
31
32pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
41pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
42
43pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
44 const MODULE: Option<ModuleKind>;
45 const KIND: EventKind;
46 const PERSIST: bool = true;
47}
48
49static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
52
53#[derive(Debug, Encodable, Decodable)]
60pub struct UnordedEventLogId {
61 ts_usecs: u64,
62 counter: u64,
63}
64
65impl UnordedEventLogId {
66 fn new() -> Self {
67 Self {
68 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
69 .unwrap_or(u64::MAX),
71 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
72 }
73 }
74}
75
76#[derive(
79 Copy,
80 Clone,
81 Debug,
82 Encodable,
83 Decodable,
84 Default,
85 PartialEq,
86 Eq,
87 PartialOrd,
88 Ord,
89 Serialize,
90 Deserialize,
91)]
92pub struct EventLogId(u64);
93
94impl EventLogId {
95 fn next(self) -> EventLogId {
96 Self(self.0 + 1)
97 }
98
99 fn saturating_add(self, rhs: u64) -> EventLogId {
100 Self(self.0.saturating_add(rhs))
101 }
102
103 pub fn saturating_sub(self, rhs: u64) -> EventLogId {
104 Self(self.0.saturating_sub(rhs))
105 }
106
107 pub fn new(log: u64) -> Self {
108 Self(log)
109 }
110}
111
112impl FromStr for EventLogId {
113 type Err = <u64 as FromStr>::Err;
114
115 fn from_str(s: &str) -> Result<Self, Self::Err> {
116 u64::from_str(s).map(Self)
117 }
118}
119
120#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
121pub struct EventKind(Cow<'static, str>);
122
123impl EventKind {
124 pub const fn from_static(value: &'static str) -> Self {
125 Self(Cow::Borrowed(value))
126 }
127}
128
129impl<'s> From<&'s str> for EventKind {
130 fn from(value: &'s str) -> Self {
131 Self(Cow::Owned(value.to_owned()))
132 }
133}
134
135impl From<String> for EventKind {
136 fn from(value: String) -> Self {
137 Self(Cow::Owned(value))
138 }
139}
140
141#[derive(Debug, Encodable, Decodable, Clone)]
142pub struct UnorderedEventLogEntry {
143 pub persist: bool,
144 pub inner: EventLogEntry,
145}
146
147#[derive(Debug, Encodable, Decodable, Clone)]
148pub struct EventLogEntry {
149 pub kind: EventKind,
156
157 pub module: Option<(ModuleKind, ModuleInstanceId)>,
165
166 ts_usecs: u64,
168
169 pub payload: Vec<u8>,
172}
173
174impl_db_record!(
175 key = UnordedEventLogId,
176 value = UnorderedEventLogEntry,
177 db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
178);
179
180#[derive(Clone, Debug, Encodable, Decodable)]
181pub struct UnorderedEventLogIdPrefixAll;
182
183impl_db_lookup!(
184 key = UnordedEventLogId,
185 query_prefix = UnorderedEventLogIdPrefixAll
186);
187
188#[derive(Clone, Debug, Encodable, Decodable)]
189pub struct EventLogIdPrefixAll;
190
191#[derive(Clone, Debug, Encodable, Decodable)]
192pub struct EventLogIdPrefix(EventLogId);
193
194impl_db_record!(
195 key = EventLogId,
196 value = EventLogEntry,
197 db_prefix = DB_KEY_PREFIX_EVENT_LOG,
198);
199
200impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
201
202impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
203
204#[apply(async_trait_maybe_send!)]
205pub trait DBTransactionEventLogExt {
206 async fn log_event_raw(
207 &mut self,
208 log_ordering_wakeup_tx: watch::Sender<()>,
209 kind: EventKind,
210 module_kind: Option<ModuleKind>,
211 module_id: Option<ModuleInstanceId>,
212 payload: Vec<u8>,
213 persist: bool,
214 );
215
216 async fn log_event<E>(
221 &mut self,
222 log_ordering_wakeup_tx: watch::Sender<()>,
223 module_id: Option<ModuleInstanceId>,
224 event: E,
225 ) where
226 E: Event + Send,
227 {
228 self.log_event_raw(
229 log_ordering_wakeup_tx,
230 E::KIND,
231 E::MODULE,
232 module_id,
233 serde_json::to_vec(&event).expect("Serialization can't fail"),
234 <E as Event>::PERSIST,
235 )
236 .await;
237 }
238
239 async fn get_next_event_log_id(&mut self) -> EventLogId;
244
245 async fn get_event_log(
247 &mut self,
248 pos: Option<EventLogId>,
249 limit: u64,
250 ) -> Vec<(
251 EventLogId,
252 EventKind,
253 Option<(ModuleKind, ModuleInstanceId)>,
254 u64,
255 serde_json::Value,
256 )>;
257}
258
259#[apply(async_trait_maybe_send!)]
260impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
261where
262 Cap: Send,
263{
264 async fn log_event_raw(
265 &mut self,
266 log_ordering_wakeup_tx: watch::Sender<()>,
267 kind: EventKind,
268 module_kind: Option<ModuleKind>,
269 module_id: Option<ModuleInstanceId>,
270 payload: Vec<u8>,
271 persist: bool,
272 ) {
273 assert_eq!(
274 module_kind.is_some(),
275 module_id.is_some(),
276 "Events of modules must have module_id set"
277 );
278
279 let unordered_id = UnordedEventLogId::new();
280 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
281
282 if self
283 .insert_entry(
284 &unordered_id,
285 &UnorderedEventLogEntry {
286 persist,
287 inner: EventLogEntry {
288 kind,
289 module: module_kind.map(|kind| (kind, module_id.unwrap())),
290 ts_usecs: unordered_id.ts_usecs,
291 payload,
292 },
293 },
294 )
295 .await
296 .is_some()
297 {
298 panic!("Trying to overwrite event in the client event log");
299 }
300 self.on_commit(move || {
301 let _ = log_ordering_wakeup_tx.send(());
302 });
303 }
304
305 async fn get_next_event_log_id(&mut self) -> EventLogId {
306 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
307 .await
308 .next()
309 .await
310 .map(|(k, _v)| k.next())
311 .unwrap_or_default()
312 }
313
314 async fn get_event_log(
315 &mut self,
316 pos: Option<EventLogId>,
317 limit: u64,
318 ) -> Vec<(
319 EventLogId,
320 EventKind,
321 Option<(ModuleKind, ModuleInstanceId)>,
322 u64,
323 serde_json::Value,
324 )> {
325 let pos = pos.unwrap_or_default();
326 self.find_by_range(pos..pos.saturating_add(limit))
327 .await
328 .map(|(k, v)| {
329 (
330 k,
331 v.kind,
332 v.module,
333 v.ts_usecs,
334 serde_json::from_slice(&v.payload).unwrap_or_default(),
335 )
336 })
337 .collect()
338 .await
339 }
340}
341
342pub async fn run_event_log_ordering_task(
345 db: Database,
346 mut log_ordering_task_wakeup: watch::Receiver<()>,
347 log_event_added: watch::Sender<()>,
348 log_event_added_transient: broadcast::Sender<EventLogEntry>,
349) {
350 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
351 let mut next_entry_id = db
352 .begin_transaction_nc()
353 .await
354 .get_next_event_log_id()
355 .await;
356
357 loop {
358 let mut dbtx = db.begin_transaction().await;
359
360 let unordered_events = dbtx
361 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
362 .await
363 .collect::<Vec<_>>()
364 .await;
365 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
366
367 for (unordered_id, entry) in &unordered_events {
368 assert!(
369 dbtx.remove_entry(unordered_id).await.is_some(),
370 "Must never fail to remove entry"
371 );
372 if entry.persist {
373 assert!(
374 dbtx.insert_entry(&next_entry_id, &entry.inner)
375 .await
376 .is_none(),
377 "Must never overwrite existing event"
378 );
379 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
380 next_entry_id = next_entry_id.next();
381 } else {
382 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
383 dbtx.on_commit({
384 let log_event_added_transient = log_event_added_transient.clone();
385 let entry = entry.inner.clone();
386
387 move || {
388 let _ = log_event_added_transient.send(entry);
390 }
391 });
392 }
393 }
394
395 dbtx.commit_tx().await;
399 if !unordered_events.is_empty() {
400 let _ = log_event_added.send(());
401 }
402
403 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
404 if log_ordering_task_wakeup.changed().await.is_err() {
405 break;
406 }
407 }
408
409 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
410}
411
412pub async fn handle_events<F, R, K>(
413 db: Database,
414 pos_key: &K,
415 mut log_event_added: watch::Receiver<()>,
416 call_fn: F,
417) -> anyhow::Result<()>
418where
419 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
420 K: DatabaseRecord<Value = EventLogId>,
421 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
422 R: Future<Output = anyhow::Result<()>>,
423{
424 let mut next_key: EventLogId = db
425 .begin_transaction_nc()
426 .await
427 .get_value(pos_key)
428 .await
429 .unwrap_or_default();
430
431 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
432
433 loop {
434 let mut dbtx = db.begin_transaction().await;
435
436 if let Some(event) = dbtx.get_value(&next_key).await {
437 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
438
439 next_key = next_key.next();
440 dbtx.insert_entry(pos_key, &next_key).await;
441
442 dbtx.commit_tx().await;
443 } else if log_event_added.changed().await.is_err() {
444 break Ok(());
445 }
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use std::sync::atomic::AtomicU8;
452 use std::sync::Arc;
453
454 use anyhow::bail;
455 use fedimint_core::db::mem_impl::MemDatabase;
456 use fedimint_core::db::IRawDatabaseExt as _;
457 use fedimint_core::encoding::{Decodable, Encodable};
458 use fedimint_core::impl_db_record;
459 use fedimint_core::task::TaskGroup;
460 use tokio::sync::{broadcast, watch};
461 use tokio::try_join;
462 use tracing::info;
463
464 use super::{
465 handle_events, run_event_log_ordering_task, DBTransactionEventLogExt as _, EventLogId,
466 };
467 use crate::EventKind;
468
469 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
470 pub struct TestLogIdKey;
471
472 impl_db_record!(key = TestLogIdKey, value = EventLogId, db_prefix = 0x00,);
473
474 #[test_log::test(tokio::test)]
475 async fn sanity_handle_events() {
476 let db = MemDatabase::new().into_database();
477 let tg = TaskGroup::new();
478
479 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
480 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
481 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
482 broadcast::channel(1024);
483
484 tg.spawn_cancellable(
485 "event log ordering task",
486 run_event_log_ordering_task(
487 db.clone(),
488 log_ordering_wakeup_rx,
489 log_event_added_tx,
490 log_event_added_transient_tx,
491 ),
492 );
493
494 let counter = Arc::new(AtomicU8::new(0));
495
496 let _ = try_join!(
497 handle_events(
498 db.clone(),
499 &TestLogIdKey,
500 log_event_added_rx,
501 move |_dbtx, event| {
502 let counter = counter.clone();
503 Box::pin(async move {
504 info!("{event:?}");
505
506 assert_eq!(
507 event.kind,
508 EventKind::from(format!(
509 "{}",
510 counter.load(std::sync::atomic::Ordering::Relaxed)
511 ))
512 );
513
514 if counter.load(std::sync::atomic::Ordering::Relaxed) == 4 {
515 bail!("Time to wrap up");
516 }
517 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
518 Ok(())
519 })
520 },
521 ),
522 async {
523 for i in 0..=4 {
524 let mut dbtx = db.begin_transaction().await;
525 dbtx.log_event_raw(
526 log_ordering_wakeup_tx.clone(),
527 EventKind::from(format!("{i}")),
528 None,
529 None,
530 vec![],
531 true,
532 )
533 .await;
534
535 dbtx.commit_tx().await;
536 }
537
538 Ok(())
539 }
540 );
541 }
542}