1use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
9
10use crate::{
11 hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
12 UserChannelId,
13};
14
15use crate::config::{may_announce_channel, Config};
16use crate::connection::ConnectionManager;
17use crate::data_store::DataStoreUpdateResult;
18use crate::fee_estimator::ConfirmationTarget;
19use crate::liquidity::LiquiditySource;
20use crate::logger::Logger;
21
22use crate::payment::store::{
23 PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
24};
25
26use crate::io::{
27 EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
28 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
29};
30use crate::logger::{log_debug, log_error, log_info, LdkLogger};
31
32use lightning::events::bump_transaction::BumpTransactionEvent;
33use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
34use lightning::events::{Event as LdkEvent, PaymentFailureReason};
35use lightning::impl_writeable_tlv_based_enum;
36use lightning::ln::channelmanager::PaymentId;
37use lightning::ln::types::ChannelId;
38use lightning::routing::gossip::NodeId;
39use lightning::util::errors::APIError;
40use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
41
42use lightning_types::payment::{PaymentHash, PaymentPreimage};
43
44use lightning_liquidity::lsps2::utils::compute_opening_fee;
45
46use bitcoin::blockdata::locktime::absolute::LockTime;
47use bitcoin::secp256k1::PublicKey;
48use bitcoin::{Amount, OutPoint};
49
50use rand::{thread_rng, Rng};
51
52use core::future::Future;
53use core::task::{Poll, Waker};
54use std::collections::VecDeque;
55use std::ops::Deref;
56use std::sync::{Arc, Condvar, Mutex, RwLock};
57use std::time::Duration;
58
59#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum Event {
64 PaymentSuccessful {
66 payment_id: Option<PaymentId>,
70 payment_hash: PaymentHash,
72 payment_preimage: Option<PaymentPreimage>,
78 fee_paid_msat: Option<u64>,
80 },
81 PaymentFailed {
83 payment_id: Option<PaymentId>,
87 payment_hash: Option<PaymentHash>,
94 reason: Option<PaymentFailureReason>,
98 },
99 PaymentReceived {
101 payment_id: Option<PaymentId>,
105 payment_hash: PaymentHash,
107 amount_msat: u64,
109 custom_records: Vec<CustomTlvRecord>,
111 },
112 PaymentForwarded {
114 prev_channel_id: ChannelId,
116 next_channel_id: ChannelId,
118 prev_user_channel_id: Option<UserChannelId>,
122 next_user_channel_id: Option<UserChannelId>,
127 prev_node_id: Option<PublicKey>,
132 next_node_id: Option<PublicKey>,
137 total_fee_earned_msat: Option<u64>,
149 skimmed_fee_msat: Option<u64>,
158 claim_from_onchain_tx: bool,
161 outbound_amount_forwarded_msat: Option<u64>,
165 },
166 PaymentClaimable {
178 payment_id: PaymentId,
180 payment_hash: PaymentHash,
182 claimable_amount_msat: u64,
184 claim_deadline: Option<u32>,
187 custom_records: Vec<CustomTlvRecord>,
189 },
190 ChannelPending {
192 channel_id: ChannelId,
194 user_channel_id: UserChannelId,
196 former_temporary_channel_id: ChannelId,
198 counterparty_node_id: PublicKey,
200 funding_txo: OutPoint,
202 },
203 ChannelReady {
205 channel_id: ChannelId,
207 user_channel_id: UserChannelId,
209 counterparty_node_id: Option<PublicKey>,
213 },
214 ChannelClosed {
216 channel_id: ChannelId,
218 user_channel_id: UserChannelId,
220 counterparty_node_id: Option<PublicKey>,
224 reason: Option<ClosureReason>,
226 },
227}
228
229impl_writeable_tlv_based_enum!(Event,
230 (0, PaymentSuccessful) => {
231 (0, payment_hash, required),
232 (1, fee_paid_msat, option),
233 (3, payment_id, option),
234 (5, payment_preimage, option),
235 },
236 (1, PaymentFailed) => {
237 (0, payment_hash, option),
238 (1, reason, upgradable_option),
239 (3, payment_id, option),
240 },
241 (2, PaymentReceived) => {
242 (0, payment_hash, required),
243 (1, payment_id, option),
244 (2, amount_msat, required),
245 (3, custom_records, optional_vec),
246 },
247 (3, ChannelReady) => {
248 (0, channel_id, required),
249 (1, counterparty_node_id, option),
250 (2, user_channel_id, required),
251 },
252 (4, ChannelPending) => {
253 (0, channel_id, required),
254 (2, user_channel_id, required),
255 (4, former_temporary_channel_id, required),
256 (6, counterparty_node_id, required),
257 (8, funding_txo, required),
258 },
259 (5, ChannelClosed) => {
260 (0, channel_id, required),
261 (1, counterparty_node_id, option),
262 (2, user_channel_id, required),
263 (3, reason, upgradable_option),
264 },
265 (6, PaymentClaimable) => {
266 (0, payment_hash, required),
267 (2, payment_id, required),
268 (4, claimable_amount_msat, required),
269 (6, claim_deadline, option),
270 (7, custom_records, optional_vec),
271 },
272 (7, PaymentForwarded) => {
273 (0, prev_channel_id, required),
274 (1, prev_node_id, option),
275 (2, next_channel_id, required),
276 (3, next_node_id, option),
277 (4, prev_user_channel_id, option),
278 (6, next_user_channel_id, option),
279 (8, total_fee_earned_msat, option),
280 (10, skimmed_fee_msat, option),
281 (12, claim_from_onchain_tx, required),
282 (14, outbound_amount_forwarded_msat, option),
283 }
284);
285
286pub struct EventQueue<L: Deref>
287where
288 L::Target: LdkLogger,
289{
290 queue: Arc<Mutex<VecDeque<Event>>>,
291 waker: Arc<Mutex<Option<Waker>>>,
292 notifier: Condvar,
293 kv_store: Arc<DynStore>,
294 logger: L,
295}
296
297impl<L: Deref> EventQueue<L>
298where
299 L::Target: LdkLogger,
300{
301 pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
302 let queue = Arc::new(Mutex::new(VecDeque::new()));
303 let waker = Arc::new(Mutex::new(None));
304 let notifier = Condvar::new();
305 Self { queue, waker, notifier, kv_store, logger }
306 }
307
308 pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
309 {
310 let mut locked_queue = self.queue.lock().unwrap();
311 locked_queue.push_back(event);
312 self.persist_queue(&locked_queue)?;
313 }
314
315 self.notifier.notify_one();
316
317 if let Some(waker) = self.waker.lock().unwrap().take() {
318 waker.wake();
319 }
320 Ok(())
321 }
322
323 pub(crate) fn next_event(&self) -> Option<Event> {
324 let locked_queue = self.queue.lock().unwrap();
325 locked_queue.front().cloned()
326 }
327
328 pub(crate) async fn next_event_async(&self) -> Event {
329 EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
330 }
331
332 pub(crate) fn wait_next_event(&self) -> Event {
333 let locked_queue =
334 self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
335 locked_queue.front().unwrap().clone()
336 }
337
338 pub(crate) fn event_handled(&self) -> Result<(), Error> {
339 {
340 let mut locked_queue = self.queue.lock().unwrap();
341 locked_queue.pop_front();
342 self.persist_queue(&locked_queue)?;
343 }
344 self.notifier.notify_one();
345
346 if let Some(waker) = self.waker.lock().unwrap().take() {
347 waker.wake();
348 }
349 Ok(())
350 }
351
352 fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
353 let data = EventQueueSerWrapper(locked_queue).encode();
354 self.kv_store
355 .write(
356 EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
357 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
358 EVENT_QUEUE_PERSISTENCE_KEY,
359 &data,
360 )
361 .map_err(|e| {
362 log_error!(
363 self.logger,
364 "Write for key {}/{}/{} failed due to: {}",
365 EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
366 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
367 EVENT_QUEUE_PERSISTENCE_KEY,
368 e
369 );
370 Error::PersistenceFailed
371 })?;
372 Ok(())
373 }
374}
375
376impl<L: Deref> ReadableArgs<(Arc<DynStore>, L)> for EventQueue<L>
377where
378 L::Target: LdkLogger,
379{
380 #[inline]
381 fn read<R: lightning::io::Read>(
382 reader: &mut R, args: (Arc<DynStore>, L),
383 ) -> Result<Self, lightning::ln::msgs::DecodeError> {
384 let (kv_store, logger) = args;
385 let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
386 let queue = Arc::new(Mutex::new(read_queue.0));
387 let waker = Arc::new(Mutex::new(None));
388 let notifier = Condvar::new();
389 Ok(Self { queue, waker, notifier, kv_store, logger })
390 }
391}
392
393struct EventQueueDeserWrapper(VecDeque<Event>);
394
395impl Readable for EventQueueDeserWrapper {
396 fn read<R: lightning::io::Read>(
397 reader: &mut R,
398 ) -> Result<Self, lightning::ln::msgs::DecodeError> {
399 let len: u16 = Readable::read(reader)?;
400 let mut queue = VecDeque::with_capacity(len as usize);
401 for _ in 0..len {
402 queue.push_back(Readable::read(reader)?);
403 }
404 Ok(Self(queue))
405 }
406}
407
408struct EventQueueSerWrapper<'a>(&'a VecDeque<Event>);
409
410impl Writeable for EventQueueSerWrapper<'_> {
411 fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
412 (self.0.len() as u16).write(writer)?;
413 for e in self.0.iter() {
414 e.write(writer)?;
415 }
416 Ok(())
417 }
418}
419
420struct EventFuture {
421 event_queue: Arc<Mutex<VecDeque<Event>>>,
422 waker: Arc<Mutex<Option<Waker>>>,
423}
424
425impl Future for EventFuture {
426 type Output = Event;
427
428 fn poll(
429 self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
430 ) -> core::task::Poll<Self::Output> {
431 if let Some(event) = self.event_queue.lock().unwrap().front() {
432 Poll::Ready(event.clone())
433 } else {
434 *self.waker.lock().unwrap() = Some(cx.waker().clone());
435 Poll::Pending
436 }
437 }
438}
439
440pub(crate) struct EventHandler<L: Deref + Clone + Sync + Send + 'static>
441where
442 L::Target: LdkLogger,
443{
444 event_queue: Arc<EventQueue<L>>,
445 wallet: Arc<Wallet>,
446 bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
447 channel_manager: Arc<ChannelManager>,
448 connection_manager: Arc<ConnectionManager<L>>,
449 output_sweeper: Arc<Sweeper>,
450 network_graph: Arc<Graph>,
451 liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452 payment_store: Arc<PaymentStore>,
453 peer_store: Arc<PeerStore<L>>,
454 runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
455 logger: L,
456 config: Arc<Config>,
457}
458
459impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
460where
461 L::Target: LdkLogger,
462{
463 pub fn new(
464 event_queue: Arc<EventQueue<L>>, wallet: Arc<Wallet>,
465 bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
466 channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467 output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468 liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469 payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
470 runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471 ) -> Self {
472 Self {
473 event_queue,
474 wallet,
475 bump_tx_event_handler,
476 channel_manager,
477 connection_manager,
478 output_sweeper,
479 network_graph,
480 liquidity_source,
481 payment_store,
482 peer_store,
483 logger,
484 runtime,
485 config,
486 }
487 }
488
489 pub async fn handle_event(&self, event: LdkEvent) -> Result<(), ReplayEvent> {
490 match event {
491 LdkEvent::FundingGenerationReady {
492 temporary_channel_id,
493 counterparty_node_id,
494 channel_value_satoshis,
495 output_script,
496 ..
497 } => {
498 let confirmation_target = ConfirmationTarget::ChannelFunding;
501
502 let cur_height = self.channel_manager.current_best_block().height;
504 let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO);
505
506 let channel_amount = Amount::from_sat(channel_value_satoshis);
508 match self.wallet.create_funding_transaction(
509 output_script,
510 channel_amount,
511 confirmation_target,
512 locktime,
513 ) {
514 Ok(final_tx) => {
515 match self.channel_manager.funding_transaction_generated(
517 temporary_channel_id,
518 counterparty_node_id,
519 final_tx,
520 ) {
521 Ok(()) => {},
522 Err(APIError::APIMisuseError { err }) => {
523 log_error!(self.logger, "Panicking due to APIMisuseError: {}", err);
524 panic!("APIMisuseError: {}", err);
525 },
526 Err(APIError::ChannelUnavailable { err }) => {
527 log_error!(
528 self.logger,
529 "Failed to process funding transaction as channel went away before we could fund it: {}",
530 err
531 )
532 },
533 Err(err) => {
534 log_error!(
535 self.logger,
536 "Failed to process funding transaction: {:?}",
537 err
538 )
539 },
540 }
541 },
542 Err(err) => {
543 log_error!(self.logger, "Failed to create funding transaction: {}", err);
544 self.channel_manager
545 .force_close_without_broadcasting_txn(
546 &temporary_channel_id,
547 &counterparty_node_id,
548 "Failed to create funding transaction".to_string(),
549 )
550 .unwrap_or_else(|e| {
551 log_error!(self.logger, "Failed to force close channel after funding generation failed: {:?}", e);
552 panic!(
553 "Failed to force close channel after funding generation failed"
554 );
555 });
556 },
557 }
558 },
559 LdkEvent::FundingTxBroadcastSafe { .. } => {
560 debug_assert!(false, "We currently only support safe funding, so this event should never be emitted.");
561 },
562 LdkEvent::PaymentClaimable {
563 payment_hash,
564 purpose,
565 amount_msat,
566 receiver_node_id: _,
567 via_channel_id: _,
568 via_user_channel_id: _,
569 claim_deadline,
570 onion_fields,
571 counterparty_skimmed_fee_msat,
572 payment_id: _,
573 } => {
574 let payment_id = PaymentId(payment_hash.0);
575 if let Some(info) = self.payment_store.get(&payment_id) {
576 if info.direction == PaymentDirection::Outbound {
577 log_info!(
578 self.logger,
579 "Refused inbound payment with ID {}: circular payments are unsupported.",
580 payment_id
581 );
582 self.channel_manager.fail_htlc_backwards(&payment_hash);
583
584 let update = PaymentDetailsUpdate {
585 status: Some(PaymentStatus::Failed),
586 ..PaymentDetailsUpdate::new(payment_id)
587 };
588 match self.payment_store.update(&update) {
589 Ok(_) => return Ok(()),
590 Err(e) => {
591 log_error!(self.logger, "Failed to access payment store: {}", e);
592 return Err(ReplayEvent());
593 },
594 };
595 }
596
597 if info.status == PaymentStatus::Succeeded
598 || matches!(info.kind, PaymentKind::Spontaneous { .. })
599 {
600 log_info!(
601 self.logger,
602 "Refused duplicate inbound payment from payment hash {} of {}msat",
603 hex_utils::to_string(&payment_hash.0),
604 amount_msat,
605 );
606 self.channel_manager.fail_htlc_backwards(&payment_hash);
607
608 let update = PaymentDetailsUpdate {
609 status: Some(PaymentStatus::Failed),
610 ..PaymentDetailsUpdate::new(payment_id)
611 };
612 match self.payment_store.update(&update) {
613 Ok(_) => return Ok(()),
614 Err(e) => {
615 log_error!(self.logger, "Failed to access payment store: {}", e);
616 return Err(ReplayEvent());
617 },
618 };
619 }
620
621 let max_total_opening_fee_msat = match info.kind {
622 PaymentKind::Bolt11Jit { lsp_fee_limits, .. } => {
623 lsp_fee_limits
624 .max_total_opening_fee_msat
625 .or_else(|| {
626 lsp_fee_limits.max_proportional_opening_fee_ppm_msat.and_then(
627 |max_prop_fee| {
628 compute_opening_fee(amount_msat, 0, max_prop_fee)
630 },
631 )
632 })
633 .unwrap_or(0)
634 },
635 _ => 0,
636 };
637
638 if counterparty_skimmed_fee_msat > max_total_opening_fee_msat {
639 log_info!(
640 self.logger,
641 "Refusing inbound payment with hash {} as the counterparty-withheld fee of {}msat exceeds our limit of {}msat",
642 hex_utils::to_string(&payment_hash.0),
643 counterparty_skimmed_fee_msat,
644 max_total_opening_fee_msat,
645 );
646 self.channel_manager.fail_htlc_backwards(&payment_hash);
647
648 let update = PaymentDetailsUpdate {
649 hash: Some(Some(payment_hash)),
650 status: Some(PaymentStatus::Failed),
651 ..PaymentDetailsUpdate::new(payment_id)
652 };
653 match self.payment_store.update(&update) {
654 Ok(_) => return Ok(()),
655 Err(e) => {
656 log_error!(self.logger, "Failed to access payment store: {}", e);
657 return Err(ReplayEvent());
658 },
659 };
660 }
661
662 if counterparty_skimmed_fee_msat > 0 {
664 match info.kind {
665 PaymentKind::Bolt11Jit { .. } => {
666 let update = PaymentDetailsUpdate {
667 counterparty_skimmed_fee_msat: Some(Some(counterparty_skimmed_fee_msat)),
668 ..PaymentDetailsUpdate::new(payment_id)
669 };
670 match self.payment_store.update(&update) {
671 Ok(_) => (),
672 Err(e) => {
673 log_error!(self.logger, "Failed to access payment store: {}", e);
674 return Err(ReplayEvent());
675 },
676 };
677 }
678 _ => debug_assert!(false, "We only expect the counterparty to get away with withholding fees for JIT payments."),
679 }
680 }
681
682 match info.kind {
686 PaymentKind::Bolt11 { preimage, .. } => {
687 if purpose.preimage().is_none() {
688 debug_assert!(
689 preimage.is_none(),
690 "We would have registered the preimage if we knew"
691 );
692
693 let custom_records = onion_fields
694 .map(|cf| {
695 cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect()
696 })
697 .unwrap_or_default();
698 let event = Event::PaymentClaimable {
699 payment_id,
700 payment_hash,
701 claimable_amount_msat: amount_msat,
702 claim_deadline,
703 custom_records,
704 };
705 match self.event_queue.add_event(event) {
706 Ok(_) => return Ok(()),
707 Err(e) => {
708 log_error!(
709 self.logger,
710 "Failed to push to event queue: {}",
711 e
712 );
713 return Err(ReplayEvent());
714 },
715 };
716 }
717 },
718 _ => {},
719 }
720 }
721
722 log_info!(
723 self.logger,
724 "Received payment from payment hash {} of {}msat",
725 hex_utils::to_string(&payment_hash.0),
726 amount_msat,
727 );
728 let payment_preimage = match purpose {
729 PaymentPurpose::Bolt11InvoicePayment { payment_preimage, .. } => {
730 payment_preimage
731 },
732 PaymentPurpose::Bolt12OfferPayment {
733 payment_preimage,
734 payment_secret,
735 payment_context,
736 ..
737 } => {
738 let payer_note = payment_context.invoice_request.payer_note_truncated;
739 let offer_id = payment_context.offer_id;
740 let quantity = payment_context.invoice_request.quantity;
741 let kind = PaymentKind::Bolt12Offer {
742 hash: Some(payment_hash),
743 preimage: payment_preimage,
744 secret: Some(payment_secret),
745 offer_id,
746 payer_note,
747 quantity,
748 };
749
750 let payment = PaymentDetails::new(
751 payment_id,
752 kind,
753 Some(amount_msat),
754 None,
755 PaymentDirection::Inbound,
756 PaymentStatus::Pending,
757 );
758
759 match self.payment_store.insert(payment) {
760 Ok(false) => (),
761 Ok(true) => {
762 log_error!(
763 self.logger,
764 "Bolt12OfferPayment with ID {} was previously known",
765 payment_id,
766 );
767 debug_assert!(false);
768 },
769 Err(e) => {
770 log_error!(
771 self.logger,
772 "Failed to insert payment with ID {}: {}",
773 payment_id,
774 e
775 );
776 debug_assert!(false);
777 },
778 }
779 payment_preimage
780 },
781 PaymentPurpose::Bolt12RefundPayment { payment_preimage, .. } => {
782 payment_preimage
783 },
784 PaymentPurpose::SpontaneousPayment(preimage) => {
785 let kind = PaymentKind::Spontaneous {
787 hash: payment_hash,
788 preimage: Some(preimage),
789 };
790
791 let payment = PaymentDetails::new(
792 payment_id,
793 kind,
794 Some(amount_msat),
795 None,
796 PaymentDirection::Inbound,
797 PaymentStatus::Pending,
798 );
799
800 match self.payment_store.insert(payment) {
801 Ok(false) => (),
802 Ok(true) => {
803 log_error!(
804 self.logger,
805 "Spontaneous payment with ID {} was previously known",
806 payment_id,
807 );
808 debug_assert!(false);
809 },
810 Err(e) => {
811 log_error!(
812 self.logger,
813 "Failed to insert payment with ID {}: {}",
814 payment_id,
815 e
816 );
817 debug_assert!(false);
818 },
819 }
820
821 Some(preimage)
822 },
823 };
824
825 if let Some(preimage) = payment_preimage {
826 self.channel_manager.claim_funds(preimage);
827 } else {
828 log_error!(
829 self.logger,
830 "Failed to claim payment with ID {}: preimage unknown.",
831 payment_id,
832 );
833 self.channel_manager.fail_htlc_backwards(&payment_hash);
834
835 let update = PaymentDetailsUpdate {
836 hash: Some(Some(payment_hash)),
837 status: Some(PaymentStatus::Failed),
838 ..PaymentDetailsUpdate::new(payment_id)
839 };
840 match self.payment_store.update(&update) {
841 Ok(_) => return Ok(()),
842 Err(e) => {
843 log_error!(self.logger, "Failed to access payment store: {}", e);
844 return Err(ReplayEvent());
845 },
846 };
847 }
848 },
849 LdkEvent::PaymentClaimed {
850 payment_hash,
851 purpose,
852 amount_msat,
853 receiver_node_id: _,
854 htlcs: _,
855 sender_intended_total_msat: _,
856 onion_fields,
857 payment_id: _,
858 } => {
859 let payment_id = PaymentId(payment_hash.0);
860 log_info!(
861 self.logger,
862 "Claimed payment with ID {} from payment hash {} of {}msat.",
863 payment_id,
864 hex_utils::to_string(&payment_hash.0),
865 amount_msat,
866 );
867
868 let update = match purpose {
869 PaymentPurpose::Bolt11InvoicePayment {
870 payment_preimage,
871 payment_secret,
872 ..
873 } => PaymentDetailsUpdate {
874 preimage: Some(payment_preimage),
875 secret: Some(Some(payment_secret)),
876 amount_msat: Some(Some(amount_msat)),
877 status: Some(PaymentStatus::Succeeded),
878 ..PaymentDetailsUpdate::new(payment_id)
879 },
880 PaymentPurpose::Bolt12OfferPayment {
881 payment_preimage, payment_secret, ..
882 } => PaymentDetailsUpdate {
883 preimage: Some(payment_preimage),
884 secret: Some(Some(payment_secret)),
885 amount_msat: Some(Some(amount_msat)),
886 status: Some(PaymentStatus::Succeeded),
887 ..PaymentDetailsUpdate::new(payment_id)
888 },
889 PaymentPurpose::Bolt12RefundPayment {
890 payment_preimage,
891 payment_secret,
892 ..
893 } => PaymentDetailsUpdate {
894 preimage: Some(payment_preimage),
895 secret: Some(Some(payment_secret)),
896 amount_msat: Some(Some(amount_msat)),
897 status: Some(PaymentStatus::Succeeded),
898 ..PaymentDetailsUpdate::new(payment_id)
899 },
900 PaymentPurpose::SpontaneousPayment(preimage) => PaymentDetailsUpdate {
901 preimage: Some(Some(preimage)),
902 amount_msat: Some(Some(amount_msat)),
903 status: Some(PaymentStatus::Succeeded),
904 ..PaymentDetailsUpdate::new(payment_id)
905 },
906 };
907
908 match self.payment_store.update(&update) {
909 Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (
910 ),
913 Ok(DataStoreUpdateResult::NotFound) => {
914 log_error!(
915 self.logger,
916 "Claimed payment with ID {} couldn't be found in store",
917 payment_id,
918 );
919 },
920 Err(e) => {
921 log_error!(
922 self.logger,
923 "Failed to update payment with ID {}: {}",
924 payment_id,
925 e
926 );
927 return Err(ReplayEvent());
928 },
929 }
930
931 let event = Event::PaymentReceived {
932 payment_id: Some(payment_id),
933 payment_hash,
934 amount_msat,
935 custom_records: onion_fields
936 .map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
937 .unwrap_or_default(),
938 };
939 match self.event_queue.add_event(event) {
940 Ok(_) => return Ok(()),
941 Err(e) => {
942 log_error!(self.logger, "Failed to push to event queue: {}", e);
943 return Err(ReplayEvent());
944 },
945 };
946 },
947 LdkEvent::PaymentSent {
948 payment_id,
949 payment_preimage,
950 payment_hash,
951 fee_paid_msat,
952 ..
953 } => {
954 let payment_id = if let Some(id) = payment_id {
955 id
956 } else {
957 debug_assert!(false, "payment_id should always be set.");
958 return Ok(());
959 };
960
961 let update = PaymentDetailsUpdate {
962 hash: Some(Some(payment_hash)),
963 preimage: Some(Some(payment_preimage)),
964 fee_paid_msat: Some(fee_paid_msat),
965 status: Some(PaymentStatus::Succeeded),
966 ..PaymentDetailsUpdate::new(payment_id)
967 };
968
969 match self.payment_store.update(&update) {
970 Ok(_) => {},
971 Err(e) => {
972 log_error!(self.logger, "Failed to access payment store: {}", e);
973 return Err(ReplayEvent());
974 },
975 };
976
977 self.payment_store.get(&payment_id).map(|payment| {
978 log_info!(
979 self.logger,
980 "Successfully sent payment of {}msat{} from \
981 payment hash {:?} with preimage {:?}",
982 payment.amount_msat.unwrap(),
983 if let Some(fee) = fee_paid_msat {
984 format!(" (fee {} msat)", fee)
985 } else {
986 "".to_string()
987 },
988 hex_utils::to_string(&payment_hash.0),
989 hex_utils::to_string(&payment_preimage.0)
990 );
991 });
992 let event = Event::PaymentSuccessful {
993 payment_id: Some(payment_id),
994 payment_hash,
995 payment_preimage: Some(payment_preimage),
996 fee_paid_msat,
997 };
998
999 match self.event_queue.add_event(event) {
1000 Ok(_) => return Ok(()),
1001 Err(e) => {
1002 log_error!(self.logger, "Failed to push to event queue: {}", e);
1003 return Err(ReplayEvent());
1004 },
1005 };
1006 },
1007 LdkEvent::PaymentFailed { payment_id, payment_hash, reason, .. } => {
1008 log_info!(
1009 self.logger,
1010 "Failed to send payment with ID {} due to {:?}.",
1011 payment_id,
1012 reason
1013 );
1014
1015 let update = PaymentDetailsUpdate {
1016 hash: Some(payment_hash),
1017 status: Some(PaymentStatus::Failed),
1018 ..PaymentDetailsUpdate::new(payment_id)
1019 };
1020 match self.payment_store.update(&update) {
1021 Ok(_) => {},
1022 Err(e) => {
1023 log_error!(self.logger, "Failed to access payment store: {}", e);
1024 return Err(ReplayEvent());
1025 },
1026 };
1027
1028 let event =
1029 Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
1030 match self.event_queue.add_event(event) {
1031 Ok(_) => return Ok(()),
1032 Err(e) => {
1033 log_error!(self.logger, "Failed to push to event queue: {}", e);
1034 return Err(ReplayEvent());
1035 },
1036 };
1037 },
1038
1039 LdkEvent::PaymentPathSuccessful { .. } => {},
1040 LdkEvent::PaymentPathFailed { .. } => {},
1041 LdkEvent::ProbeSuccessful { .. } => {},
1042 LdkEvent::ProbeFailed { .. } => {},
1043 LdkEvent::HTLCHandlingFailed { failed_next_destination, .. } => {
1044 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1045 liquidity_source.handle_htlc_handling_failed(failed_next_destination);
1046 }
1047 },
1048 LdkEvent::PendingHTLCsForwardable { time_forwardable } => {
1049 let forwarding_channel_manager = self.channel_manager.clone();
1050 let min = time_forwardable.as_millis() as u64;
1051
1052 let runtime_lock = self.runtime.read().unwrap();
1053 debug_assert!(runtime_lock.is_some());
1054
1055 if let Some(runtime) = runtime_lock.as_ref() {
1056 runtime.spawn(async move {
1057 let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1058 tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
1059
1060 forwarding_channel_manager.process_pending_htlc_forwards();
1061 });
1062 }
1063 },
1064 LdkEvent::SpendableOutputs { outputs, channel_id } => {
1065 match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
1066 Ok(_) => return Ok(()),
1067 Err(_) => {
1068 log_error!(self.logger, "Failed to track spendable outputs");
1069 return Err(ReplayEvent());
1070 },
1071 };
1072 },
1073 LdkEvent::OpenChannelRequest {
1074 temporary_channel_id,
1075 counterparty_node_id,
1076 funding_satoshis,
1077 channel_type,
1078 channel_negotiation_type: _,
1079 is_announced,
1080 params: _,
1081 } => {
1082 if is_announced {
1083 if let Err(err) = may_announce_channel(&*self.config) {
1084 log_error!(self.logger, "Rejecting inbound announced channel from peer {} due to missing configuration: {}", counterparty_node_id, err);
1085
1086 self.channel_manager
1087 .force_close_without_broadcasting_txn(
1088 &temporary_channel_id,
1089 &counterparty_node_id,
1090 "Channel request rejected".to_string(),
1091 )
1092 .unwrap_or_else(|e| {
1093 log_error!(self.logger, "Failed to reject channel: {:?}", e)
1094 });
1095 return Ok(());
1096 }
1097 }
1098
1099 let anchor_channel = channel_type.requires_anchors_zero_fee_htlc_tx();
1100 if anchor_channel {
1101 if let Some(anchor_channels_config) =
1102 self.config.anchor_channels_config.as_ref()
1103 {
1104 let cur_anchor_reserve_sats = crate::total_anchor_channels_reserve_sats(
1105 &self.channel_manager,
1106 &self.config,
1107 );
1108 let spendable_amount_sats = self
1109 .wallet
1110 .get_spendable_amount_sats(cur_anchor_reserve_sats)
1111 .unwrap_or(0);
1112
1113 let required_amount_sats = if anchor_channels_config
1114 .trusted_peers_no_reserve
1115 .contains(&counterparty_node_id)
1116 {
1117 0
1118 } else {
1119 anchor_channels_config.per_channel_reserve_sats
1120 };
1121
1122 if spendable_amount_sats < required_amount_sats {
1123 log_error!(
1124 self.logger,
1125 "Rejecting inbound Anchor channel from peer {} due to insufficient available on-chain reserves. Available: {}/{}sats",
1126 counterparty_node_id,
1127 spendable_amount_sats,
1128 required_amount_sats,
1129 );
1130 self.channel_manager
1131 .force_close_without_broadcasting_txn(
1132 &temporary_channel_id,
1133 &counterparty_node_id,
1134 "Channel request rejected".to_string(),
1135 )
1136 .unwrap_or_else(|e| {
1137 log_error!(self.logger, "Failed to reject channel: {:?}", e)
1138 });
1139 return Ok(());
1140 }
1141 } else {
1142 log_error!(
1143 self.logger,
1144 "Rejecting inbound channel from peer {} due to Anchor channels being disabled.",
1145 counterparty_node_id,
1146 );
1147 self.channel_manager
1148 .force_close_without_broadcasting_txn(
1149 &temporary_channel_id,
1150 &counterparty_node_id,
1151 "Channel request rejected".to_string(),
1152 )
1153 .unwrap_or_else(|e| {
1154 log_error!(self.logger, "Failed to reject channel: {:?}", e)
1155 });
1156 return Ok(());
1157 }
1158 }
1159
1160 let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
1161 let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id);
1162 let res = if allow_0conf {
1163 self.channel_manager.accept_inbound_channel_from_trusted_peer_0conf(
1164 &temporary_channel_id,
1165 &counterparty_node_id,
1166 user_channel_id,
1167 )
1168 } else {
1169 self.channel_manager.accept_inbound_channel(
1170 &temporary_channel_id,
1171 &counterparty_node_id,
1172 user_channel_id,
1173 )
1174 };
1175
1176 match res {
1177 Ok(()) => {
1178 log_info!(
1179 self.logger,
1180 "Accepting inbound{}{} channel of {}sats from{} peer {}",
1181 if allow_0conf { " 0conf" } else { "" },
1182 if anchor_channel { " Anchor" } else { "" },
1183 funding_satoshis,
1184 if allow_0conf { " trusted" } else { "" },
1185 counterparty_node_id,
1186 );
1187 },
1188 Err(e) => {
1189 log_error!(
1190 self.logger,
1191 "Error while accepting inbound{}{} channel from{} peer {}: {:?}",
1192 if allow_0conf { " 0conf" } else { "" },
1193 if anchor_channel { " Anchor" } else { "" },
1194 counterparty_node_id,
1195 if allow_0conf { " trusted" } else { "" },
1196 e,
1197 );
1198 },
1199 }
1200 },
1201 LdkEvent::PaymentForwarded {
1202 prev_channel_id,
1203 next_channel_id,
1204 prev_user_channel_id,
1205 next_user_channel_id,
1206 prev_node_id,
1207 next_node_id,
1208 total_fee_earned_msat,
1209 skimmed_fee_msat,
1210 claim_from_onchain_tx,
1211 outbound_amount_forwarded_msat,
1212 } => {
1213 let read_only_network_graph = self.network_graph.read_only();
1214 let nodes = read_only_network_graph.nodes();
1215 let channels = self.channel_manager.list_channels();
1216
1217 let node_str = |channel_id: &Option<ChannelId>| {
1218 channel_id
1219 .and_then(|channel_id| channels.iter().find(|c| c.channel_id == channel_id))
1220 .and_then(|channel| {
1221 nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id))
1222 })
1223 .map_or("private_node".to_string(), |node| {
1224 node.announcement_info
1225 .as_ref()
1226 .map_or("unnamed node".to_string(), |ann| {
1227 format!("node {}", ann.alias())
1228 })
1229 })
1230 };
1231 let channel_str = |channel_id: &Option<ChannelId>| {
1232 channel_id
1233 .map(|channel_id| format!(" with channel {}", channel_id))
1234 .unwrap_or_default()
1235 };
1236 let from_prev_str = format!(
1237 " from {}{}",
1238 node_str(&prev_channel_id),
1239 channel_str(&prev_channel_id)
1240 );
1241 let to_next_str =
1242 format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id));
1243
1244 let fee_earned = total_fee_earned_msat.unwrap_or(0);
1245 if claim_from_onchain_tx {
1246 log_info!(
1247 self.logger,
1248 "Forwarded payment{}{} of {}msat, earning {}msat in fees from claiming onchain.",
1249 from_prev_str,
1250 to_next_str,
1251 outbound_amount_forwarded_msat.unwrap_or(0),
1252 fee_earned,
1253 );
1254 } else {
1255 log_info!(
1256 self.logger,
1257 "Forwarded payment{}{} of {}msat, earning {}msat in fees.",
1258 from_prev_str,
1259 to_next_str,
1260 outbound_amount_forwarded_msat.unwrap_or(0),
1261 fee_earned,
1262 );
1263 }
1264
1265 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1266 liquidity_source.handle_payment_forwarded(next_channel_id);
1267 }
1268
1269 let event = Event::PaymentForwarded {
1270 prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1271 next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1272 prev_user_channel_id: prev_user_channel_id.map(UserChannelId),
1273 next_user_channel_id: next_user_channel_id.map(UserChannelId),
1274 prev_node_id,
1275 next_node_id,
1276 total_fee_earned_msat,
1277 skimmed_fee_msat,
1278 claim_from_onchain_tx,
1279 outbound_amount_forwarded_msat,
1280 };
1281 self.event_queue.add_event(event).map_err(|e| {
1282 log_error!(self.logger, "Failed to push to event queue: {}", e);
1283 ReplayEvent()
1284 })?;
1285 },
1286 LdkEvent::ChannelPending {
1287 channel_id,
1288 user_channel_id,
1289 former_temporary_channel_id,
1290 counterparty_node_id,
1291 funding_txo,
1292 ..
1293 } => {
1294 log_info!(
1295 self.logger,
1296 "New channel {} with counterparty {} has been created and is pending confirmation on chain.",
1297 channel_id,
1298 counterparty_node_id,
1299 );
1300
1301 let event = Event::ChannelPending {
1302 channel_id,
1303 user_channel_id: UserChannelId(user_channel_id),
1304 former_temporary_channel_id: former_temporary_channel_id.unwrap(),
1305 counterparty_node_id,
1306 funding_txo,
1307 };
1308 match self.event_queue.add_event(event) {
1309 Ok(_) => {},
1310 Err(e) => {
1311 log_error!(self.logger, "Failed to push to event queue: {}", e);
1312 return Err(ReplayEvent());
1313 },
1314 };
1315
1316 let network_graph = self.network_graph.read_only();
1317 let channels =
1318 self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
1319 if let Some(pending_channel) =
1320 channels.into_iter().find(|c| c.channel_id == channel_id)
1321 {
1322 if !pending_channel.is_outbound
1323 && self.peer_store.get_peer(&counterparty_node_id).is_none()
1324 {
1325 if let Some(address) = network_graph
1326 .nodes()
1327 .get(&NodeId::from_pubkey(&counterparty_node_id))
1328 .and_then(|node_info| node_info.announcement_info.as_ref())
1329 .and_then(|ann_info| ann_info.addresses().first())
1330 {
1331 let peer = PeerInfo {
1332 node_id: counterparty_node_id,
1333 address: address.clone(),
1334 };
1335
1336 self.peer_store.add_peer(peer).unwrap_or_else(|e| {
1337 log_error!(
1338 self.logger,
1339 "Failed to add peer {} to peer store: {}",
1340 counterparty_node_id,
1341 e
1342 );
1343 });
1344 }
1345 }
1346 }
1347 },
1348 LdkEvent::ChannelReady {
1349 channel_id, user_channel_id, counterparty_node_id, ..
1350 } => {
1351 log_info!(
1352 self.logger,
1353 "Channel {} with counterparty {} ready to be used.",
1354 channel_id,
1355 counterparty_node_id,
1356 );
1357
1358 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1359 liquidity_source.handle_channel_ready(
1360 user_channel_id,
1361 &channel_id,
1362 &counterparty_node_id,
1363 );
1364 }
1365
1366 let event = Event::ChannelReady {
1367 channel_id,
1368 user_channel_id: UserChannelId(user_channel_id),
1369 counterparty_node_id: Some(counterparty_node_id),
1370 };
1371 match self.event_queue.add_event(event) {
1372 Ok(_) => {},
1373 Err(e) => {
1374 log_error!(self.logger, "Failed to push to event queue: {}", e);
1375 return Err(ReplayEvent());
1376 },
1377 };
1378 },
1379 LdkEvent::ChannelClosed {
1380 channel_id,
1381 reason,
1382 user_channel_id,
1383 counterparty_node_id,
1384 ..
1385 } => {
1386 log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);
1387
1388 let event = Event::ChannelClosed {
1389 channel_id,
1390 user_channel_id: UserChannelId(user_channel_id),
1391 counterparty_node_id,
1392 reason: Some(reason),
1393 };
1394
1395 match self.event_queue.add_event(event) {
1396 Ok(_) => {},
1397 Err(e) => {
1398 log_error!(self.logger, "Failed to push to event queue: {}", e);
1399 return Err(ReplayEvent());
1400 },
1401 };
1402 },
1403 LdkEvent::DiscardFunding { .. } => {},
1404 LdkEvent::HTLCIntercepted {
1405 requested_next_hop_scid,
1406 intercept_id,
1407 expected_outbound_amount_msat,
1408 payment_hash,
1409 ..
1410 } => {
1411 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1412 liquidity_source.handle_htlc_intercepted(
1413 requested_next_hop_scid,
1414 intercept_id,
1415 expected_outbound_amount_msat,
1416 payment_hash,
1417 );
1418 }
1419 },
1420 LdkEvent::InvoiceReceived { .. } => {
1421 debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
1422 },
1423 LdkEvent::ConnectionNeeded { node_id, addresses } => {
1424 let runtime_lock = self.runtime.read().unwrap();
1425 debug_assert!(runtime_lock.is_some());
1426
1427 if let Some(runtime) = runtime_lock.as_ref() {
1428 let spawn_logger = self.logger.clone();
1429 let spawn_cm = Arc::clone(&self.connection_manager);
1430 runtime.spawn(async move {
1431 for addr in &addresses {
1432 match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1433 Ok(()) => {
1434 return;
1435 },
1436 Err(e) => {
1437 log_error!(
1438 spawn_logger,
1439 "Failed to establish connection to peer {}@{}: {}",
1440 node_id,
1441 addr,
1442 e
1443 );
1444 },
1445 }
1446 }
1447 });
1448 }
1449 },
1450 LdkEvent::BumpTransaction(bte) => {
1451 match bte {
1452 BumpTransactionEvent::ChannelClose {
1453 ref channel_id,
1454 ref counterparty_node_id,
1455 ..
1456 } => {
1457 if let Some(anchor_channels_config) =
1459 self.config.anchor_channels_config.as_ref()
1460 {
1461 if anchor_channels_config
1462 .trusted_peers_no_reserve
1463 .contains(counterparty_node_id)
1464 {
1465 log_debug!(self.logger,
1466 "Ignoring BumpTransactionEvent::ChannelClose for channel {} due to trusted counterparty {}",
1467 channel_id, counterparty_node_id
1468 );
1469 return Ok(());
1470 }
1471 }
1472 },
1473 BumpTransactionEvent::HTLCResolution { .. } => {},
1474 }
1475
1476 self.bump_tx_event_handler.handle_event(&bte);
1477 },
1478 LdkEvent::OnionMessageIntercepted { .. } => {
1479 debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1480 },
1481 LdkEvent::OnionMessagePeerConnected { .. } => {
1482 debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1483 },
1484 }
1485 Ok(())
1486 }
1487}
1488
1489#[cfg(test)]
1490mod tests {
1491 use super::*;
1492 use lightning::util::test_utils::{TestLogger, TestStore};
1493 use std::sync::atomic::{AtomicU16, Ordering};
1494 use std::time::Duration;
1495
1496 #[tokio::test]
1497 async fn event_queue_persistence() {
1498 let store: Arc<DynStore> = Arc::new(TestStore::new(false));
1499 let logger = Arc::new(TestLogger::new());
1500 let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1501 assert_eq!(event_queue.next_event(), None);
1502
1503 let expected_event = Event::ChannelReady {
1504 channel_id: ChannelId([23u8; 32]),
1505 user_channel_id: UserChannelId(2323),
1506 counterparty_node_id: None,
1507 };
1508 event_queue.add_event(expected_event.clone()).unwrap();
1509
1510 for _ in 0..5 {
1512 assert_eq!(event_queue.wait_next_event(), expected_event);
1513 assert_eq!(event_queue.next_event_async().await, expected_event);
1514 assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
1515 }
1516
1517 let persisted_bytes = store
1519 .read(
1520 EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1521 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1522 EVENT_QUEUE_PERSISTENCE_KEY,
1523 )
1524 .unwrap();
1525 let deser_event_queue =
1526 EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
1527 assert_eq!(deser_event_queue.wait_next_event(), expected_event);
1528
1529 event_queue.event_handled().unwrap();
1530 assert_eq!(event_queue.next_event(), None);
1531 }
1532
1533 #[tokio::test]
1534 async fn event_queue_concurrency() {
1535 let store: Arc<DynStore> = Arc::new(TestStore::new(false));
1536 let logger = Arc::new(TestLogger::new());
1537 let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1538 assert_eq!(event_queue.next_event(), None);
1539
1540 let expected_event = Event::ChannelReady {
1541 channel_id: ChannelId([23u8; 32]),
1542 user_channel_id: UserChannelId(2323),
1543 counterparty_node_id: None,
1544 };
1545
1546 tokio::select! {
1548 _ = tokio::time::sleep(Duration::from_secs(1)) => {
1549 }
1551 _ = event_queue.next_event_async() => {
1552 panic!();
1553 }
1554 }
1555
1556 assert_eq!(event_queue.next_event(), None);
1557 let enqueued_events = AtomicU16::new(0);
1559 let received_events = AtomicU16::new(0);
1560 let mut delayed_enqueue = false;
1561
1562 for _ in 0..25 {
1563 event_queue.add_event(expected_event.clone()).unwrap();
1564 enqueued_events.fetch_add(1, Ordering::SeqCst);
1565 }
1566
1567 loop {
1568 tokio::select! {
1569 _ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
1570 event_queue.add_event(expected_event.clone()).unwrap();
1571 enqueued_events.fetch_add(1, Ordering::SeqCst);
1572 delayed_enqueue = true;
1573 }
1574 e = event_queue.next_event_async() => {
1575 assert_eq!(e, expected_event);
1576 event_queue.event_handled().unwrap();
1577 received_events.fetch_add(1, Ordering::SeqCst);
1578
1579 event_queue.add_event(expected_event.clone()).unwrap();
1580 enqueued_events.fetch_add(1, Ordering::SeqCst);
1581 }
1582 e = event_queue.next_event_async() => {
1583 assert_eq!(e, expected_event);
1584 event_queue.event_handled().unwrap();
1585 received_events.fetch_add(1, Ordering::SeqCst);
1586 }
1587 }
1588
1589 if delayed_enqueue
1590 && received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
1591 {
1592 break;
1593 }
1594 }
1595 assert_eq!(event_queue.next_event(), None);
1596
1597 let (tx, mut rx) = tokio::sync::watch::channel(());
1599 let thread_queue = Arc::clone(&event_queue);
1600 let thread_event = expected_event.clone();
1601 std::thread::spawn(move || {
1602 let e = thread_queue.wait_next_event();
1603 assert_eq!(e, thread_event);
1604 thread_queue.event_handled().unwrap();
1605 tx.send(()).unwrap();
1606 });
1607
1608 let thread_queue = Arc::clone(&event_queue);
1609 let thread_event = expected_event.clone();
1610 std::thread::spawn(move || {
1611 std::thread::sleep(Duration::from_millis(20));
1613 thread_queue.add_event(thread_event.clone()).unwrap();
1614 thread_queue.add_event(thread_event.clone()).unwrap();
1615 });
1616
1617 let e = event_queue.next_event_async().await;
1618 assert_eq!(e, expected_event.clone());
1619 event_queue.event_handled().unwrap();
1620
1621 rx.changed().await.unwrap();
1622 assert_eq!(event_queue.next_event(), None);
1623 }
1624}