simln_lib/
lib.rs

1use async_trait::async_trait;
2use bitcoin::secp256k1::PublicKey;
3use bitcoin::Network;
4use csv::WriterBuilder;
5use lightning::ln::features::NodeFeatures;
6use lightning::ln::PaymentHash;
7use rand::rngs::StdRng;
8use rand::{Rng, RngCore, SeedableRng};
9use rand_chacha::ChaCha8Rng;
10use random_activity::RandomActivityError;
11use serde::{Deserialize, Serialize};
12use std::collections::HashSet;
13use std::fmt::{Display, Formatter};
14use std::marker::Send;
15use std::path::PathBuf;
16use std::sync::Mutex as StdMutex;
17use std::time::{SystemTimeError, UNIX_EPOCH};
18use std::{collections::HashMap, sync::Arc, time::SystemTime};
19use thiserror::Error;
20use tokio::sync::mpsc::{channel, Receiver, Sender};
21use tokio::sync::Mutex;
22use tokio::task::JoinSet;
23use tokio::{select, time, time::Duration};
24use triggered::{Listener, Trigger};
25
26use self::defined_activity::DefinedPaymentActivity;
27use self::random_activity::{NetworkGraphView, RandomPaymentActivity};
28
29pub mod cln;
30mod defined_activity;
31pub mod lnd;
32mod random_activity;
33mod serializers;
34pub mod sim_node;
35#[cfg(test)]
36mod test_utils;
37
38#[derive(Serialize, Deserialize, Debug, Clone)]
39#[serde(untagged)]
40pub enum NodeConnection {
41    LND(lnd::LndConnection),
42    CLN(cln::ClnConnection),
43}
44
45#[derive(Serialize, Debug, Clone)]
46pub enum NodeId {
47    PublicKey(PublicKey),
48    Alias(String),
49}
50
51impl NodeId {
52    pub fn validate(&self, node_id: &PublicKey, alias: &mut String) -> Result<(), LightningError> {
53        match self {
54            crate::NodeId::PublicKey(pk) => {
55                if pk != node_id {
56                    return Err(LightningError::ValidationError(format!(
57                        "The provided node id does not match the one returned by the backend ({} != {}).",
58                        pk, node_id
59                    )));
60                }
61            },
62            crate::NodeId::Alias(a) => {
63                if alias != a {
64                    log::warn!(
65                        "The provided alias does not match the one returned by the backend ({} != {}).",
66                        a,
67                        alias
68                    )
69                }
70                *alias = a.to_string();
71            },
72        }
73        Ok(())
74    }
75
76    pub fn get_pk(&self) -> Result<&PublicKey, String> {
77        if let NodeId::PublicKey(pk) = self {
78            Ok(pk)
79        } else {
80            Err("NodeId is not a PublicKey".to_string())
81        }
82    }
83}
84
85impl std::fmt::Display for NodeId {
86    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
87        write!(
88            f,
89            "{}",
90            match self {
91                NodeId::PublicKey(pk) => pk.to_string(),
92                NodeId::Alias(a) => a.to_owned(),
93            }
94        )
95    }
96}
97
98/// Represents a short channel ID, expressed as a struct so that we can implement display for the trait.
99#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
100pub struct ShortChannelID(u64);
101
102/// Utility function to easily convert from u64 to `ShortChannelID`
103impl From<u64> for ShortChannelID {
104    fn from(value: u64) -> Self {
105        ShortChannelID(value)
106    }
107}
108
109/// Utility function to easily convert `ShortChannelID` into u64
110impl From<ShortChannelID> for u64 {
111    fn from(scid: ShortChannelID) -> Self {
112        scid.0
113    }
114}
115
116/// See https://github.com/lightning/bolts/blob/60de4a09727c20dea330f9ee8313034de6e50594/07-routing-gossip.md#definition-of-short_channel_id.
117impl std::fmt::Display for ShortChannelID {
118    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119        write!(
120            f,
121            "{}:{}:{}",
122            (self.0 >> 40) as u32,
123            ((self.0 >> 16) & 0xFFFFFF) as u32,
124            (self.0 & 0xFFFF) as u16,
125        )
126    }
127}
128
129#[derive(Debug, Serialize, Deserialize, Clone)]
130pub struct SimParams {
131    pub nodes: Vec<NodeConnection>,
132    #[serde(default)]
133    pub activity: Vec<ActivityParser>,
134}
135
136/// Either a value or a range parsed from the simulation file.
137#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
138#[serde(untagged)]
139pub enum ValueOrRange<T> {
140    Value(T),
141    Range(T, T),
142}
143
144impl<T> ValueOrRange<T>
145where
146    T: std::cmp::PartialOrd + rand_distr::uniform::SampleUniform + Copy,
147{
148    /// Get the enclosed value. If value is defined as a range, sample from it uniformly at random.
149    pub fn value(&self) -> T {
150        match self {
151            ValueOrRange::Value(x) => *x,
152            ValueOrRange::Range(x, y) => {
153                let mut rng = rand::thread_rng();
154                rng.gen_range(*x..*y)
155            },
156        }
157    }
158}
159
160impl<T> Display for ValueOrRange<T>
161where
162    T: Display,
163{
164    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165        match self {
166            ValueOrRange::Value(x) => write!(f, "{x}"),
167            ValueOrRange::Range(x, y) => write!(f, "({x}-{y})"),
168        }
169    }
170}
171
172/// The payment amount in msat. Either a value or a range.
173type Amount = ValueOrRange<u64>;
174/// The interval of seconds between payments. Either a value or a range.
175type Interval = ValueOrRange<u16>;
176
177/// Data structure used to parse information from the simulation file. It allows source and destination to be
178/// [NodeId], which enables the use of public keys and aliases in the simulation description.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct ActivityParser {
181    /// The source of the payment.
182    #[serde(with = "serializers::serde_node_id")]
183    pub source: NodeId,
184    /// The destination of the payment.
185    #[serde(with = "serializers::serde_node_id")]
186    pub destination: NodeId,
187    /// The time in the simulation to start the payment.
188    pub start_secs: Option<u16>,
189    /// The number of payments to send over the course of the simulation.
190    #[serde(default)]
191    pub count: Option<u64>,
192    /// The interval of the event, as in every how many seconds the payment is performed.
193    #[serde(with = "serializers::serde_value_or_range")]
194    pub interval_secs: Interval,
195    /// The amount of m_sat to used in this payment.
196    #[serde(with = "serializers::serde_value_or_range")]
197    pub amount_msat: Amount,
198}
199
200/// Data structure used internally by the simulator. Both source and destination are represented as [PublicKey] here.
201/// This is constructed during activity validation and passed along to the [Simulation].
202#[derive(Debug, Clone)]
203pub struct ActivityDefinition {
204    /// The source of the payment.
205    pub source: NodeInfo,
206    /// The destination of the payment.
207    pub destination: NodeInfo,
208    /// The time in the simulation to start the payment.
209    pub start_secs: Option<u16>,
210    /// The number of payments to send over the course of the simulation.
211    pub count: Option<u64>,
212    /// The interval of the event, as in every how many seconds the payment is performed.
213    pub interval_secs: Interval,
214    /// The amount of m_sat to used in this payment.
215    pub amount_msat: Amount,
216}
217
218#[derive(Debug, Error)]
219pub enum SimulationError {
220    #[error("Lightning Error: {0:?}")]
221    LightningError(#[from] LightningError),
222    #[error("TaskError")]
223    TaskError,
224    #[error("CSV Error: {0:?}")]
225    CsvError(#[from] csv::Error),
226    #[error("File Error")]
227    FileError,
228    #[error("{0}")]
229    RandomActivityError(RandomActivityError),
230    #[error("Simulated Network Error: {0}")]
231    SimulatedNetworkError(String),
232    #[error("System Time Error: {0}")]
233    SystemTimeError(#[from] SystemTimeError),
234    #[error("Missing Node Error: {0}")]
235    MissingNodeError(String),
236    #[error("Mpsc Channel Error: {0}")]
237    MpscChannelError(String),
238    #[error("Payment Generation Error: {0}")]
239    PaymentGenerationError(PaymentGenerationError),
240    #[error("Destination Generation Error: {0}")]
241    DestinationGenerationError(DestinationGenerationError),
242}
243
244#[derive(Debug, Error)]
245pub enum LightningError {
246    #[error("Node connection error: {0}")]
247    ConnectionError(String),
248    #[error("Get info error: {0}")]
249    GetInfoError(String),
250    #[error("Send payment error: {0}")]
251    SendPaymentError(String),
252    #[error("Track payment error: {0}")]
253    TrackPaymentError(String),
254    #[error("Invalid payment hash")]
255    InvalidPaymentHash,
256    #[error("Get node info error: {0}")]
257    GetNodeInfoError(String),
258    #[error("Config validation failed: {0}")]
259    ValidationError(String),
260    #[error("Permanent error: {0:?}")]
261    PermanentError(String),
262    #[error("List channels error: {0}")]
263    ListChannelsError(String),
264}
265
266#[derive(Debug, Clone)]
267pub struct NodeInfo {
268    pub pubkey: PublicKey,
269    pub alias: String,
270    pub features: NodeFeatures,
271}
272
273impl Display for NodeInfo {
274    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
275        let pk = self.pubkey.to_string();
276        let pk_summary = format!("{}...{}", &pk[..6], &pk[pk.len() - 6..]);
277        if self.alias.is_empty() {
278            write!(f, "{}", pk_summary)
279        } else {
280            write!(f, "{}({})", self.alias, pk_summary)
281        }
282    }
283}
284
285/// LightningNode represents the functionality that is required to execute events on a lightning node.
286#[async_trait]
287pub trait LightningNode: Send {
288    /// Get information about the node.
289    fn get_info(&self) -> &NodeInfo;
290    /// Get the network this node is running at
291    async fn get_network(&mut self) -> Result<Network, LightningError>;
292    /// Keysend payment worth `amount_msat` from a source node to the destination node.
293    async fn send_payment(
294        &mut self,
295        dest: PublicKey,
296        amount_msat: u64,
297    ) -> Result<PaymentHash, LightningError>;
298    /// Track a payment with the specified hash.
299    async fn track_payment(
300        &mut self,
301        hash: &PaymentHash,
302        shutdown: Listener,
303    ) -> Result<PaymentResult, LightningError>;
304    /// Gets information on a specific node
305    async fn get_node_info(&mut self, node_id: &PublicKey) -> Result<NodeInfo, LightningError>;
306    /// Lists all channels, at present only returns a vector of channel capacities in msat because no further
307    /// information is required.
308    async fn list_channels(&mut self) -> Result<Vec<u64>, LightningError>;
309}
310
311#[derive(Debug, Error)]
312#[error("Destination generation error: {0}")]
313pub struct DestinationGenerationError(String);
314
315pub trait DestinationGenerator: Send {
316    /// choose_destination picks a destination node within the network, returning the node's information and its
317    /// capacity (if available).
318    fn choose_destination(
319        &self,
320        source: PublicKey,
321    ) -> Result<(NodeInfo, Option<u64>), DestinationGenerationError>;
322}
323
324#[derive(Debug, Error)]
325#[error("Payment generation error: {0}")]
326pub struct PaymentGenerationError(String);
327
328pub trait PaymentGenerator: Display + Send {
329    /// Returns the time that the payments should start
330    fn payment_start(&self) -> Option<Duration>;
331
332    /// Returns the number of payments that should be made
333    fn payment_count(&self) -> Option<u64>;
334
335    /// Returns the number of seconds that a node should wait until firing its next payment.
336    fn next_payment_wait(&self) -> Result<time::Duration, PaymentGenerationError>;
337
338    /// Returns a payment amount based, with a destination capacity optionally provided to inform the amount picked.
339    fn payment_amount(
340        &self,
341        destination_capacity: Option<u64>,
342    ) -> Result<u64, PaymentGenerationError>;
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct PaymentResult {
347    pub htlc_count: usize,
348    pub payment_outcome: PaymentOutcome,
349}
350
351impl PaymentResult {
352    pub fn not_dispatched() -> Self {
353        PaymentResult {
354            htlc_count: 0,
355            payment_outcome: PaymentOutcome::NotDispatched,
356        }
357    }
358
359    pub fn track_payment_failed() -> Self {
360        PaymentResult {
361            htlc_count: 0,
362            payment_outcome: PaymentOutcome::TrackPaymentFailed,
363        }
364    }
365}
366
367impl Display for PaymentResult {
368    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
369        write!(
370            f,
371            "Payment outcome: {:?} with {} htlcs",
372            self.payment_outcome, self.htlc_count
373        )
374    }
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub enum PaymentOutcome {
379    Success,
380    RecipientRejected,
381    UserAbandoned,
382    RetriesExhausted,
383    PaymentExpired,
384    RouteNotFound,
385    UnexpectedError,
386    IncorrectPaymentDetails,
387    InsufficientBalance,
388    Unknown,
389    NotDispatched,
390    TrackPaymentFailed,
391}
392
393/// Describes a payment from a source node to a destination node.
394#[derive(Debug, Clone, Copy, Serialize)]
395struct Payment {
396    /// Pubkey of the source node dispatching the payment.
397    source: PublicKey,
398    /// Pubkey of the destination node receiving the payment.
399    destination: PublicKey,
400    /// Amount of the payment in msat.
401    amount_msat: u64,
402    /// Hash of the payment if it has been successfully dispatched.
403    #[serde(with = "serializers::serde_option_payment_hash")]
404    hash: Option<PaymentHash>,
405    /// Time at which the payment was dispatched.
406    #[serde(with = "serde_millis")]
407    dispatch_time: SystemTime,
408}
409
410impl Display for Payment {
411    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
412        let dispatch_time = self
413            .dispatch_time
414            .duration_since(UNIX_EPOCH)
415            .expect("Failed to compute duration since unix epoch.");
416
417        write!(
418            f,
419            "Payment {} dispatched at {:?} sending {} msat from {} -> {}.",
420            self.hash.map(|h| hex::encode(h.0)).unwrap_or_default(),
421            dispatch_time,
422            self.amount_msat,
423            self.source,
424            self.destination,
425        )
426    }
427}
428
429/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
430/// on.
431#[derive(Clone, Debug)]
432enum SimulationEvent {
433    /// Dispatch a payment of the specified amount to the public key provided.
434    /// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
435    SendPayment(NodeInfo, u64),
436}
437
438/// SimulationOutput provides the output of a simulation event.
439#[derive(Debug, Clone)]
440enum SimulationOutput {
441    /// Intermediate output for when simulator has successfully dispatched a payment.
442    /// We need to track the result of the payment to report on it.
443    SendPaymentSuccess(Payment),
444    /// Final output for when simulator has failed to dispatch a payment.
445    /// Report this as the final result of simulation event.
446    SendPaymentFailure(Payment, PaymentResult),
447}
448
449/// MutRngType is a convenient type alias for any random number generator (RNG) type that
450/// allows shared and exclusive access. This is necessary because a single RNG
451/// is to be shared across multiple `DestinationGenerator`s and `PaymentGenerator`s
452/// for deterministic outcomes.
453///
454/// **Note**: `StdMutex`, i.e. (`std::sync::Mutex`), is used here to avoid making the traits
455/// `DestinationGenerator` and `PaymentGenerator` async.
456type MutRngType = Arc<StdMutex<dyn RngCore + Send>>;
457
458/// Newtype for `MutRngType` to encapsulate and hide implementation details for
459/// creating new `MutRngType` types. Provides convenient API for the same purpose.
460#[derive(Clone)]
461struct MutRng(MutRngType);
462
463impl MutRng {
464    /// Creates a new MutRng given an optional `u64` argument. If `seed_opt` is `Some`,
465    /// random activity generation in the simulator occurs near-deterministically.
466    /// If it is `None`, activity generation is truly random, and based on a
467    /// non-deterministic source of entropy.
468    pub fn new(seed_opt: Option<u64>) -> Self {
469        if let Some(seed) = seed_opt {
470            Self(Arc::new(StdMutex::new(ChaCha8Rng::seed_from_u64(seed))))
471        } else {
472            Self(Arc::new(StdMutex::new(StdRng::from_entropy())))
473        }
474    }
475}
476
477/// Contains the configuration options for our simulation.
478#[derive(Clone)]
479pub struct SimulationCfg {
480    /// Total simulation time. The simulation will run forever if undefined.
481    total_time: Option<time::Duration>,
482    /// The expected payment size for the network.
483    expected_payment_msat: u64,
484    /// The number of times that the network sends its total capacity in a month of operation when generating random
485    /// activity.
486    activity_multiplier: f64,
487    /// Configurations for printing results to CSV. Results are not written if this option is None.
488    write_results: Option<WriteResults>,
489    /// Random number generator created from fixed seed.
490    seeded_rng: MutRng,
491    /// Results logger that holds the simulation statistics.
492    results: Arc<Mutex<PaymentResultLogger>>,
493}
494
495impl SimulationCfg {
496    pub fn new(
497        total_time: Option<u32>,
498        expected_payment_msat: u64,
499        activity_multiplier: f64,
500        write_results: Option<WriteResults>,
501        seed: Option<u64>,
502    ) -> Self {
503        Self {
504            total_time: total_time.map(|x| Duration::from_secs(x as u64)),
505            expected_payment_msat,
506            activity_multiplier,
507            write_results,
508            seeded_rng: MutRng::new(seed),
509            results: Arc::new(Mutex::new(PaymentResultLogger::new())),
510        }
511    }
512}
513
514#[derive(Clone)]
515pub struct Simulation {
516    /// Config for the simulation itself.
517    cfg: SimulationCfg,
518    /// The lightning node that is being simulated.
519    nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
520    /// The activity that are to be executed on the node.
521    activity: Vec<ActivityDefinition>,
522    /// High level triggers used to manage simulation tasks and shutdown.
523    shutdown_trigger: Trigger,
524    shutdown_listener: Listener,
525}
526
527#[derive(Clone)]
528pub struct WriteResults {
529    /// Data directory where CSV result files are written.
530    pub results_dir: PathBuf,
531    /// The number of activity results to batch before printing in CSV.
532    pub batch_size: u32,
533}
534
535/// ExecutorKit contains the components required to spin up an activity configured by the user, to be used to
536/// spin up the appropriate producers and consumers for the activity.
537struct ExecutorKit {
538    source_info: NodeInfo,
539    /// We use an arc mutex here because some implementations of the trait will be very expensive to clone.
540    /// See [NetworkGraphView] for details.
541    network_generator: Arc<Mutex<dyn DestinationGenerator>>,
542    payment_generator: Box<dyn PaymentGenerator>,
543}
544
545impl Simulation {
546    pub fn new(
547        cfg: SimulationCfg,
548        nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
549        activity: Vec<ActivityDefinition>,
550    ) -> Self {
551        let (shutdown_trigger, shutdown_listener) = triggered::trigger();
552        Self {
553            cfg,
554            nodes,
555            activity,
556            shutdown_trigger,
557            shutdown_listener,
558        }
559    }
560
561    /// validate_activity validates that the user-provided activity description is achievable for the network that
562    /// we're working with. If no activity description is provided, then it ensures that we have configured a network
563    /// that is suitable for random activity generation.
564    async fn validate_activity(&self) -> Result<(), LightningError> {
565        // For now, empty activity signals random activity generation
566        if self.activity.is_empty() {
567            if self.nodes.len() <= 1 {
568                return Err(LightningError::ValidationError(
569                    "At least two nodes required for random activity generation.".to_string(),
570                ));
571            } else {
572                for node in self.nodes.values() {
573                    let node = node.lock().await;
574                    if !node.get_info().features.supports_keysend() {
575                        return Err(LightningError::ValidationError(format!(
576                            "All nodes eligible for random activity generation must support keysend, {} does not",
577                            node.get_info()
578                        )));
579                    }
580                }
581            }
582        }
583
584        for payment_flow in self.activity.iter() {
585            // We need every source node that is configured to execute some activity to be included in our set of
586            // nodes so that we can execute events on it.
587            self.nodes
588                .get(&payment_flow.source.pubkey)
589                .ok_or(LightningError::ValidationError(format!(
590                    "Source node not found, {}",
591                    payment_flow.source,
592                )))?;
593
594            // Destinations must support keysend to be able to receive payments.
595            // Note: validation should be update with a different check if an event is not a payment.
596            if !payment_flow.destination.features.supports_keysend() {
597                return Err(LightningError::ValidationError(format!(
598                    "Destination node does not support keysend, {}",
599                    payment_flow.destination,
600                )));
601            }
602        }
603
604        Ok(())
605    }
606
607    /// validates that the nodes are all on the same network and ensures that we're not running on mainnet.
608    async fn validate_node_network(&self) -> Result<(), LightningError> {
609        if self.nodes.is_empty() {
610            return Err(LightningError::ValidationError(
611                "we don't control any nodes. Specify at least one node in your config file"
612                    .to_string(),
613            ));
614        }
615        let mut running_network = Option::None;
616
617        for node in self.nodes.values() {
618            let network = node.lock().await.get_network().await?;
619            if network == Network::Bitcoin {
620                return Err(LightningError::ValidationError(
621                    "mainnet is not supported".to_string(),
622                ));
623            }
624
625            running_network = running_network.take().or(Some(network));
626            if running_network != Some(network) {
627                return Err(LightningError::ValidationError(format!(
628                    "nodes are not on the same network {}.",
629                    network,
630                )));
631            }
632        }
633
634        log::info!(
635            "Simulation is running on {}.",
636            running_network.expect("Network not provided.")
637        );
638
639        Ok(())
640    }
641
642    pub async fn run(&self) -> Result<(), SimulationError> {
643        if let Some(total_time) = self.cfg.total_time {
644            log::info!("Running the simulation for {}s.", total_time.as_secs());
645        } else {
646            log::info!("Running the simulation forever.");
647        }
648
649        self.validate_node_network().await?;
650        self.validate_activity().await?;
651
652        log::info!(
653            "Simulating {} activity on {} nodes.",
654            self.activity.len(),
655            self.nodes.len()
656        );
657        let mut tasks = JoinSet::new();
658
659        // Before we start the simulation up, start tasks that will be responsible for gathering simulation data.
660        // The event channels are shared across our functionality:
661        // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the
662        //   final result of the event that it has taken.
663        // - Event Receiver: used by data reporting to receive events that have been simulated that need to be
664        //   tracked and recorded.
665        let (event_sender, event_receiver) = channel(1);
666        self.run_data_collection(event_receiver, &mut tasks);
667
668        // Get an execution kit per activity that we need to generate and spin up consumers for each source node.
669        let activities = match self.activity_executors().await {
670            Ok(a) => a,
671            Err(e) => {
672                // If we encounter an error while setting up the activity_executors,
673                // we need to shutdown and wait for tasks to finish. We have started background tasks in the
674                // run_data_collection function, so we should shut those down before returning.
675                self.shutdown();
676                while let Some(res) = tasks.join_next().await {
677                    if let Err(e) = res {
678                        log::error!("Task exited with error: {e}.");
679                    }
680                }
681                return Err(e);
682            },
683        };
684        let consumer_channels = self.dispatch_consumers(
685            activities
686                .iter()
687                .map(|generator| generator.source_info.pubkey)
688                .collect(),
689            event_sender.clone(),
690            &mut tasks,
691        );
692
693        // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
694        // The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
695        let mut producer_tasks = JoinSet::new();
696        match self
697            .dispatch_producers(activities, consumer_channels, &mut producer_tasks)
698            .await
699        {
700            Ok(_) => {},
701            Err(e) => {
702                // If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
703                // We have started background tasks in the run_data_collection function,
704                // so we should shut those down before returning.
705                self.shutdown();
706                while let Some(res) = tasks.join_next().await {
707                    if let Err(e) = res {
708                        log::error!("Task exited with error: {e}.");
709                    }
710                }
711                return Err(e);
712            },
713        }
714
715        // Start a task that waits for the producers to finish.
716        // If all producers finish, then there is nothing left to do and the simulation can be shutdown.
717        let producer_trigger = self.shutdown_trigger.clone();
718        tasks.spawn(async move {
719            while let Some(res) = producer_tasks.join_next().await {
720                if let Err(e) = res {
721                    log::error!("Producer exited with error: {e}.");
722                }
723            }
724            log::info!("All producers finished. Shutting down.");
725            producer_trigger.trigger()
726        });
727
728        // Start a task that will shutdown the simulation if the total_time is met.
729        if let Some(total_time) = self.cfg.total_time {
730            let t = self.shutdown_trigger.clone();
731            let l = self.shutdown_listener.clone();
732
733            tasks.spawn(async move {
734                if time::timeout(total_time, l).await.is_err() {
735                    log::info!(
736                        "Simulation run for {}s. Shutting down.",
737                        total_time.as_secs()
738                    );
739                    t.trigger()
740                }
741            });
742        }
743
744        // We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors
745        // that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we
746        // exited with an error or not.
747        let mut success = true;
748        while let Some(res) = tasks.join_next().await {
749            if let Err(e) = res {
750                log::error!("Task exited with error: {e}.");
751                success = false;
752            }
753        }
754
755        success.then_some(()).ok_or(SimulationError::TaskError)
756    }
757
758    pub fn shutdown(&self) {
759        self.shutdown_trigger.trigger()
760    }
761
762    pub async fn get_total_payments(&self) -> u64 {
763        self.cfg.results.lock().await.total_attempts()
764    }
765
766    pub async fn get_success_rate(&self) -> f64 {
767        self.cfg.results.lock().await.success_rate()
768    }
769
770    /// run_data_collection starts the tasks required for the simulation to report of the results of the activity that
771    /// it generates. The simulation should report outputs via the receiver that is passed in.
772    fn run_data_collection(
773        &self,
774        output_receiver: Receiver<SimulationOutput>,
775        tasks: &mut JoinSet<()>,
776    ) {
777        let listener = self.shutdown_listener.clone();
778        let shutdown = self.shutdown_trigger.clone();
779        log::debug!("Setting up simulator data collection.");
780
781        // Create a sender/receiver pair that will be used to report final results of simulation.
782        let (results_sender, results_receiver) = channel(1);
783
784        let nodes = self.nodes.clone();
785        // psr: produce simulation results
786        let psr_listener = listener.clone();
787        let psr_shutdown = shutdown.clone();
788        tasks.spawn(async move {
789            log::debug!("Starting simulation results producer.");
790            if let Err(e) =
791                produce_simulation_results(nodes, output_receiver, results_sender, psr_listener)
792                    .await
793            {
794                psr_shutdown.trigger();
795                log::error!("Produce simulation results exited with error: {e:?}.");
796            } else {
797                log::debug!("Produce simulation results received shutdown signal.");
798            }
799        });
800
801        let result_logger = self.cfg.results.clone();
802
803        let result_logger_clone = result_logger.clone();
804        let result_logger_listener = listener.clone();
805        tasks.spawn(async move {
806            log::debug!("Starting results logger.");
807            run_results_logger(
808                result_logger_listener,
809                result_logger_clone,
810                Duration::from_secs(60),
811            )
812            .await;
813            log::debug!("Exiting results logger.");
814        });
815
816        // csr: consume simulation results
817        let csr_write_results = self.cfg.write_results.clone();
818        tasks.spawn(async move {
819            log::debug!("Starting simulation results consumer.");
820            if let Err(e) = consume_simulation_results(
821                result_logger,
822                results_receiver,
823                listener,
824                csr_write_results,
825            )
826            .await
827            {
828                shutdown.trigger();
829                log::error!("Consume simulation results exited with error: {e:?}.");
830            } else {
831                log::debug!("Consume simulation result received shutdown signal.");
832            }
833        });
834
835        log::debug!("Simulator data collection set up.");
836    }
837
838    async fn activity_executors(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
839        let mut generators = Vec::new();
840
841        // Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll
842        // just populate with each type as configured.
843        if !self.activity.is_empty() {
844            for description in self.activity.iter() {
845                let activity_generator = DefinedPaymentActivity::new(
846                    description.destination.clone(),
847                    description
848                        .start_secs
849                        .map(|start| Duration::from_secs(start.into())),
850                    description.count,
851                    description.interval_secs,
852                    description.amount_msat,
853                );
854
855                generators.push(ExecutorKit {
856                    source_info: description.source.clone(),
857                    // Defined activities have very simple generators, so the traits required are implemented on
858                    // a single struct which we just cheaply clone.
859                    network_generator: Arc::new(Mutex::new(activity_generator.clone())),
860                    payment_generator: Box::new(activity_generator),
861                });
862            }
863        } else {
864            generators = self.random_activity_nodes().await?;
865        }
866
867        Ok(generators)
868    }
869
870    /// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes
871    /// that have sufficient capacity to generate payments of our expected payment amount.
872    async fn random_activity_nodes(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
873        // Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
874        // avoid double counting capacity (as each node has a counterparty in the channel).
875        let mut generators = Vec::new();
876        let mut active_nodes = HashMap::new();
877
878        // Do a first pass to get the capacity of each node which we need to be able to create a network generator.
879        // While we're at it, we get the node info and store it with capacity to create activity generators in our
880        // second pass.
881        for (pk, node) in self.nodes.iter() {
882            let chan_capacity = node.lock().await.list_channels().await?.iter().sum::<u64>();
883
884            if let Err(e) = RandomPaymentActivity::validate_capacity(
885                chan_capacity,
886                self.cfg.expected_payment_msat,
887            ) {
888                log::warn!("Node: {} not eligible for activity generation: {e}.", *pk);
889                continue;
890            }
891
892            // Don't double count channel capacity because each channel reports the total balance between counter
893            // parities. Track capacity separately to be used for our network generator.
894            let capacity = chan_capacity / 2;
895            let node_info = node.lock().await.get_node_info(pk).await?;
896            active_nodes.insert(node_info.pubkey, (node_info, capacity));
897        }
898
899        let network_generator = Arc::new(Mutex::new(
900            NetworkGraphView::new(
901                active_nodes.values().cloned().collect(),
902                self.cfg.seeded_rng.clone(),
903            )
904            .map_err(SimulationError::RandomActivityError)?,
905        ));
906
907        log::info!(
908            "Created network generator: {}.",
909            network_generator.lock().await
910        );
911
912        for (node_info, capacity) in active_nodes.values() {
913            generators.push(ExecutorKit {
914                source_info: node_info.clone(),
915                network_generator: network_generator.clone(),
916                payment_generator: Box::new(
917                    RandomPaymentActivity::new(
918                        *capacity,
919                        self.cfg.expected_payment_msat,
920                        self.cfg.activity_multiplier,
921                        self.cfg.seeded_rng.clone(),
922                    )
923                    .map_err(SimulationError::RandomActivityError)?,
924                ),
925            });
926        }
927
928        Ok(generators)
929    }
930
931    /// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation
932    /// has already ensured that we have execution on every nodes listed in consuming_nodes.
933    fn dispatch_consumers(
934        &self,
935        consuming_nodes: HashSet<PublicKey>,
936        output_sender: Sender<SimulationOutput>,
937        tasks: &mut JoinSet<()>,
938    ) -> HashMap<PublicKey, Sender<SimulationEvent>> {
939        let mut channels = HashMap::new();
940
941        for (id, node) in self
942            .nodes
943            .iter()
944            .filter(|(id, _)| consuming_nodes.contains(id))
945        {
946            // For each node we have execution on, we'll create a sender and receiver channel to produce and consumer
947            // events and insert producer in our tracking map. We do not buffer channels as we expect events to clear
948            // quickly.
949            let (sender, receiver) = channel(1);
950            channels.insert(*id, sender.clone());
951
952            // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull
953            // events from and the results sender to report the events it has triggered for further monitoring.
954            // ce: consume event
955            let ce_listener = self.shutdown_listener.clone();
956            let ce_shutdown = self.shutdown_trigger.clone();
957            let ce_output_sender = output_sender.clone();
958            let ce_node = node.clone();
959            tasks.spawn(async move {
960                let node_info = ce_node.lock().await.get_info().clone();
961                log::debug!("Starting events consumer for {}.", node_info);
962                if let Err(e) =
963                    consume_events(ce_node, receiver, ce_output_sender, ce_listener).await
964                {
965                    ce_shutdown.trigger();
966                    log::error!("Event consumer for node {node_info} exited with error: {e:?}.");
967                } else {
968                    log::debug!("Event consumer for node {node_info} completed successfully.");
969                }
970            });
971        }
972
973        channels
974    }
975
976    /// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
977    /// for every source node in the set of executors.
978    async fn dispatch_producers(
979        &self,
980        executors: Vec<ExecutorKit>,
981        producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
982        tasks: &mut JoinSet<()>,
983    ) -> Result<(), SimulationError> {
984        for executor in executors {
985            let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
986                SimulationError::RandomActivityError(RandomActivityError::ValueError(format!(
987                    "Activity producer for: {} not found.",
988                    executor.source_info.pubkey,
989                ))),
990            )?;
991
992            // pe: produce events
993            let pe_shutdown = self.shutdown_trigger.clone();
994            let pe_listener = self.shutdown_listener.clone();
995            let pe_sender = sender.clone();
996            tasks.spawn(async move {
997                let source = executor.source_info.clone();
998
999                log::info!(
1000                    "Starting activity producer for {}: {}.",
1001                    source,
1002                    executor.payment_generator
1003                );
1004
1005                if let Err(e) = produce_events(
1006                    executor.source_info,
1007                    executor.network_generator,
1008                    executor.payment_generator,
1009                    pe_sender,
1010                    pe_listener,
1011                )
1012                .await
1013                {
1014                    pe_shutdown.trigger();
1015                    log::debug!("Activity producer for {source} exited with error {e}.");
1016                } else {
1017                    log::debug!("Activity producer for {source} completed successfully.");
1018                }
1019            });
1020        }
1021
1022        Ok(())
1023    }
1024}
1025
1026/// events that are crated for a lightning node that we can execute events on. Any output that is generated from the
1027/// event being executed is piped into a channel to handle the result of the event.
1028async fn consume_events(
1029    node: Arc<Mutex<dyn LightningNode>>,
1030    mut receiver: Receiver<SimulationEvent>,
1031    sender: Sender<SimulationOutput>,
1032    listener: Listener,
1033) -> Result<(), SimulationError> {
1034    loop {
1035        select! {
1036            biased;
1037            _ = listener.clone() => {
1038                return Ok(());
1039            },
1040            simulation_event = receiver.recv() => {
1041                if let Some(event) = simulation_event {
1042                    match event {
1043                        SimulationEvent::SendPayment(dest, amt_msat) => {
1044                            let mut node = node.lock().await;
1045
1046                            let mut payment = Payment {
1047                                source: node.get_info().pubkey,
1048                                hash: None,
1049                                amount_msat: amt_msat,
1050                                destination: dest.pubkey,
1051                                dispatch_time: SystemTime::now(),
1052                            };
1053
1054                            let outcome = match node.send_payment(dest.pubkey, amt_msat).await {
1055                                Ok(payment_hash) => {
1056                                    log::debug!(
1057                                        "Send payment: {} -> {}: ({}).",
1058                                        node.get_info(),
1059                                        dest,
1060                                        hex::encode(payment_hash.0)
1061                                    );
1062                                    // We need to track the payment outcome using the payment hash that we have received.
1063                                    payment.hash = Some(payment_hash);
1064                                    SimulationOutput::SendPaymentSuccess(payment)
1065                                }
1066                                Err(e) => {
1067                                    log::error!(
1068                                        "Error while sending payment {} -> {}.",
1069                                        node.get_info(),
1070                                        dest
1071                                    );
1072
1073                                    match e {
1074                                        LightningError::PermanentError(s) => {
1075                                            return Err(SimulationError::LightningError(LightningError::PermanentError(s)));
1076                                        }
1077                                        _ => SimulationOutput::SendPaymentFailure(
1078                                            payment,
1079                                            PaymentResult::not_dispatched(),
1080                                        ),
1081                                    }
1082                                }
1083                            };
1084
1085                            select!{
1086                                biased;
1087                                _ = listener.clone() => {
1088                                    return Ok(())
1089                                }
1090                                send_result = sender.send(outcome.clone()) => {
1091                                    if send_result.is_err() {
1092                                        return Err(SimulationError::MpscChannelError(
1093                                                format!("Error sending simulation output {outcome:?}.")));
1094                                    }
1095                                }
1096                            }
1097                        }
1098                    }
1099                } else {
1100                    return Ok(())
1101                }
1102            }
1103        }
1104    }
1105}
1106
1107/// produce events generates events for the activity description provided. It accepts a shutdown listener so it can
1108/// exit if other threads signal that they have errored out.
1109async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator + ?Sized>(
1110    source: NodeInfo,
1111    network_generator: Arc<Mutex<N>>,
1112    node_generator: Box<A>,
1113    sender: Sender<SimulationEvent>,
1114    listener: Listener,
1115) -> Result<(), SimulationError> {
1116    let mut current_count = 0;
1117    loop {
1118        if let Some(c) = node_generator.payment_count() {
1119            if c == current_count {
1120                log::info!(
1121                    "Payment count has been met for {source}: {c} payments. Stopping the activity."
1122                );
1123                return Ok(());
1124            }
1125        }
1126
1127        let wait = get_payment_delay(current_count, &source, node_generator.as_ref())?;
1128
1129        select! {
1130            biased;
1131            _ = listener.clone() => {
1132                return Ok(());
1133            },
1134            // Wait until our time to next payment has elapsed then execute a random amount payment to a random
1135            // destination.
1136            _ = time::sleep(wait) => {
1137                let (destination, capacity) = network_generator.lock().await.choose_destination(source.pubkey).map_err(SimulationError::DestinationGenerationError)?;
1138
1139                // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
1140                // a payment amount something has gone wrong (because we should have validated that we can always
1141                // generate amounts), so we exit.
1142                let amount = match node_generator.payment_amount(capacity) {
1143                    Ok(amt) => {
1144                        if amt == 0 {
1145                            log::debug!("Skipping zero amount payment for {source} -> {destination}.");
1146                            continue;
1147                        }
1148                        amt
1149                    },
1150                    Err(e) => {
1151                        return Err(SimulationError::PaymentGenerationError(e));
1152                    },
1153                };
1154
1155                log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);
1156
1157                // Send the payment, exiting if we can no longer send to the consumer.
1158                let event = SimulationEvent::SendPayment(destination.clone(), amount);
1159                select!{
1160                    biased;
1161                    _ = listener.clone() => {
1162                        return Ok(());
1163                    },
1164                    send_result = sender.send(event.clone()) => {
1165                        if send_result.is_err(){
1166                            return Err(SimulationError::MpscChannelError(
1167                                    format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
1168                        }
1169                    },
1170                }
1171
1172                current_count += 1;
1173            },
1174        }
1175    }
1176}
1177
1178/// Gets the wait time for the next payment. If this is the first payment being generated, and a specific start delay
1179/// was set we return a once-off delay. Otherwise, the interval between payments is used.
1180fn get_payment_delay<A: PaymentGenerator + ?Sized>(
1181    call_count: u64,
1182    source: &NodeInfo,
1183    node_generator: &A,
1184) -> Result<Duration, SimulationError> {
1185    // Note: we can't check if let Some() && call_count (syntax not allowed) so we add an additional branch in here.
1186    // The alternative is to call payment_start twice (which is _technically_ fine because it always returns the same
1187    // value), but this approach only costs us a few extra lines so we go for the more verbose approach so that we
1188    // don't have to make any assumptions about the underlying operation of payment_start.
1189    if call_count != 0 {
1190        let wait = node_generator
1191            .next_payment_wait()
1192            .map_err(SimulationError::PaymentGenerationError)?;
1193        log::debug!("Next payment for {source} in {:?}.", wait);
1194        Ok(wait)
1195    } else if let Some(start) = node_generator.payment_start() {
1196        log::debug!(
1197            "First payment for {source} will be after a start delay of {:?}.",
1198            start
1199        );
1200        Ok(start)
1201    } else {
1202        let wait = node_generator
1203            .next_payment_wait()
1204            .map_err(SimulationError::PaymentGenerationError)?;
1205        log::debug!("First payment for {source} in {:?}.", wait);
1206        Ok(wait)
1207    }
1208}
1209
1210async fn consume_simulation_results(
1211    logger: Arc<Mutex<PaymentResultLogger>>,
1212    mut receiver: Receiver<(Payment, PaymentResult)>,
1213    listener: Listener,
1214    write_results: Option<WriteResults>,
1215) -> Result<(), SimulationError> {
1216    let mut writer = match write_results {
1217        Some(res) => {
1218            let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
1219            let file = res
1220                .results_dir
1221                .join(format!("simulation_{:?}.csv", duration));
1222            let writer = WriterBuilder::new().from_path(file)?;
1223            Some((writer, res.batch_size))
1224        },
1225        None => None,
1226    };
1227
1228    let mut counter = 1;
1229
1230    loop {
1231        select! {
1232            biased;
1233            _ = listener.clone() => {
1234                writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| {
1235                    SimulationError::FileError
1236                }))?;
1237                return Ok(());
1238            },
1239            payment_result = receiver.recv() => {
1240                match payment_result {
1241                    Some((details, result)) => {
1242                        logger.lock().await.report_result(&details, &result);
1243                        log::trace!("Resolved dispatched payment: {} with: {}.", details, result);
1244
1245                        if let Some((ref mut w, batch_size)) = writer {
1246                            w.serialize((details, result)).map_err(|e| {
1247                                let _ = w.flush();
1248                                SimulationError::CsvError(e)
1249                            })?;
1250                            counter = counter % batch_size + 1;
1251                            if batch_size == counter {
1252                                w.flush().map_err(|_| {
1253                                    SimulationError::FileError
1254                                })?;
1255                            }
1256                        }
1257                    },
1258                    None => return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError)),
1259                }
1260            }
1261        }
1262    }
1263}
1264
1265/// PaymentResultLogger is an aggregate logger that will report on a summary of the payments that have been reported.
1266#[derive(Default)]
1267struct PaymentResultLogger {
1268    success_payment: u64,
1269    failed_payment: u64,
1270    total_sent: u64,
1271}
1272
1273impl PaymentResultLogger {
1274    fn new() -> Self {
1275        PaymentResultLogger {
1276            ..Default::default()
1277        }
1278    }
1279
1280    fn report_result(&mut self, details: &Payment, result: &PaymentResult) {
1281        match result.payment_outcome {
1282            PaymentOutcome::Success => self.success_payment += 1,
1283            _ => self.failed_payment += 1,
1284        }
1285
1286        self.total_sent += details.amount_msat;
1287    }
1288
1289    fn total_attempts(&self) -> u64 {
1290        self.success_payment + self.failed_payment
1291    }
1292
1293    fn success_rate(&self) -> f64 {
1294        (self.success_payment as f64 / self.total_attempts() as f64) * 100.0
1295    }
1296}
1297
1298impl Display for PaymentResultLogger {
1299    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1300        write!(
1301            f,
1302            "Processed {} payments sending {} msat total with {:.2}% success rate.",
1303            self.total_attempts(),
1304            self.total_sent,
1305            self.success_rate()
1306        )
1307    }
1308}
1309
1310/// Reports a summary of payment results at a duration specified by `interval`
1311/// Note that `run_results_logger` does not error in any way, thus it has no
1312/// trigger. It listens for triggers to ensure clean exit.
1313async fn run_results_logger(
1314    listener: Listener,
1315    logger: Arc<Mutex<PaymentResultLogger>>,
1316    interval: Duration,
1317) {
1318    log::info!("Summary of results will be reported every {:?}.", interval);
1319
1320    loop {
1321        select! {
1322            biased;
1323            _ = listener.clone() => {
1324                break
1325            }
1326
1327            _ = time::sleep(interval) => {
1328                log::info!("{}", logger.lock().await)
1329            }
1330        }
1331    }
1332}
1333
1334/// produce_results is responsible for receiving the outputs of events that the simulator has taken and
1335/// spinning up a producer that will report the results to our main result consumer. We handle each output
1336/// separately because they can take a long time to resolve (eg, a payment that ends up on chain will take a long
1337/// time to resolve).
1338///
1339/// Note: this producer does not accept a shutdown trigger because it only expects to be dispatched once. In the single
1340/// producer case exit will drop the only sending channel and the receiving channel provided to the consumer will error
1341/// out. In the multiple-producer case, a single producer shutting down does not drop *all* sending channels so the
1342/// consumer will not exit and a trigger is required.
1343async fn produce_simulation_results(
1344    nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
1345    mut output_receiver: Receiver<SimulationOutput>,
1346    results: Sender<(Payment, PaymentResult)>,
1347    listener: Listener,
1348) -> Result<(), SimulationError> {
1349    let mut set = tokio::task::JoinSet::new();
1350
1351    let result = loop {
1352        tokio::select! {
1353            biased;
1354            _ = listener.clone() => {
1355                break Ok(())
1356            },
1357            output = output_receiver.recv() => {
1358                match output {
1359                    Some(simulation_output) => {
1360                        match simulation_output{
1361                            SimulationOutput::SendPaymentSuccess(payment) => {
1362                                if let Some(source_node) = nodes.get(&payment.source) {
1363                                    set.spawn(track_payment_result(
1364                                        source_node.clone(), results.clone(), payment, listener.clone()
1365                                    ));
1366                                } else {
1367                                    break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
1368                                }
1369                            },
1370                            SimulationOutput::SendPaymentFailure(payment, result) => {
1371                                select!{
1372                                    _ = listener.clone() => {
1373                                        return Ok(());
1374                                    },
1375                                    send_result = results.send((payment, result.clone())) => {
1376                                        if send_result.is_err(){
1377                                            break Err(SimulationError::MpscChannelError(
1378                                                format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.",
1379                                                        payment.hash, payment.dispatch_time),
1380                                            ));
1381                                        }
1382                                    },
1383                                }
1384                            }
1385                        };
1386                    },
1387                    None => break Ok(())
1388                }
1389            }
1390        }
1391    };
1392
1393    log::debug!("Simulation results producer exiting.");
1394    while let Some(res) = set.join_next().await {
1395        if let Err(e) = res {
1396            log::error!("Simulation results producer task exited with error: {e}.");
1397        }
1398    }
1399
1400    result
1401}
1402
1403async fn track_payment_result(
1404    node: Arc<Mutex<dyn LightningNode>>,
1405    results: Sender<(Payment, PaymentResult)>,
1406    payment: Payment,
1407    listener: Listener,
1408) -> Result<(), SimulationError> {
1409    log::trace!("Payment result tracker starting.");
1410
1411    let mut node = node.lock().await;
1412
1413    let res = match payment.hash {
1414        Some(hash) => {
1415            log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
1416            let track_payment = node.track_payment(&hash, listener.clone());
1417
1418            match track_payment.await {
1419                Ok(res) => {
1420                    log::debug!(
1421                        "Track payment {} result: {:?}.",
1422                        hex::encode(hash.0),
1423                        res.payment_outcome
1424                    );
1425                    res
1426                },
1427                Err(e) => {
1428                    log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0));
1429                    PaymentResult::track_payment_failed()
1430                },
1431            }
1432        },
1433        // None means that the payment was not dispatched, so we cannot track it.
1434        None => {
1435            log::error!(
1436                "We cannot track a payment that has not been dispatched. Missing payment hash."
1437            );
1438            PaymentResult::not_dispatched()
1439        },
1440    };
1441
1442    select! {
1443        biased;
1444        _ = listener.clone() => {
1445            log::debug!("Track payment result received a shutdown signal.");
1446        },
1447        send_payment_result = results.send((payment, res.clone())) => {
1448            if send_payment_result.is_err() {
1449                return Err(SimulationError::MpscChannelError(
1450                        format!("Failed to send payment result {res} for payment {payment}.")))
1451            }
1452        }
1453    }
1454
1455    log::trace!("Result tracking complete. Payment result tracker exiting.");
1456
1457    Ok(())
1458}
1459
1460#[cfg(test)]
1461mod tests {
1462    use crate::{get_payment_delay, test_utils, MutRng, PaymentGenerationError, PaymentGenerator};
1463    use mockall::mock;
1464    use std::fmt;
1465    use std::time::Duration;
1466
1467    #[test]
1468    fn create_seeded_mut_rng() {
1469        let seeds = vec![u64::MIN, u64::MAX];
1470
1471        for seed in seeds {
1472            let mut_rng_1 = MutRng::new(Some(seed));
1473            let mut_rng_2 = MutRng::new(Some(seed));
1474
1475            let mut rng_1 = mut_rng_1.0.lock().unwrap();
1476            let mut rng_2 = mut_rng_2.0.lock().unwrap();
1477
1478            assert_eq!(rng_1.next_u64(), rng_2.next_u64())
1479        }
1480    }
1481
1482    #[test]
1483    fn create_unseeded_mut_rng() {
1484        let mut_rng_1 = MutRng::new(None);
1485        let mut_rng_2 = MutRng::new(None);
1486
1487        let mut rng_1 = mut_rng_1.0.lock().unwrap();
1488        let mut rng_2 = mut_rng_2.0.lock().unwrap();
1489
1490        assert_ne!(rng_1.next_u64(), rng_2.next_u64())
1491    }
1492
1493    mock! {
1494        pub Generator {}
1495
1496        impl fmt::Display for Generator {
1497            fn fmt<'a>(&self, f: &mut fmt::Formatter<'a>) -> fmt::Result;
1498        }
1499
1500        impl PaymentGenerator for Generator {
1501            fn payment_start(&self) -> Option<Duration>;
1502            fn payment_count(&self) -> Option<u64>;
1503            fn next_payment_wait(&self) -> Result<Duration, PaymentGenerationError>;
1504            fn payment_amount(&self, destination_capacity: Option<u64>) -> Result<u64, PaymentGenerationError>;
1505        }
1506    }
1507
1508    #[test]
1509    fn test_no_payment_delay() {
1510        let node = test_utils::create_nodes(1, 100_000)
1511            .first()
1512            .unwrap()
1513            .0
1514            .clone();
1515
1516        // Setup mocked generator to have no start time and send payments every 5 seconds.
1517        let mut mock_generator = MockGenerator::new();
1518        mock_generator.expect_payment_start().return_once(|| None);
1519        let payment_interval = Duration::from_secs(5);
1520        mock_generator
1521            .expect_next_payment_wait()
1522            .returning(move || Ok(payment_interval));
1523
1524        assert_eq!(
1525            get_payment_delay(0, &node, &mock_generator).unwrap(),
1526            payment_interval
1527        );
1528        assert_eq!(
1529            get_payment_delay(1, &node, &mock_generator).unwrap(),
1530            payment_interval
1531        );
1532    }
1533
1534    #[test]
1535    fn test_payment_delay() {
1536        let node = test_utils::create_nodes(1, 100_000)
1537            .first()
1538            .unwrap()
1539            .0
1540            .clone();
1541
1542        // Setup mocked generator to have a start delay and payment interval with different values.
1543        let mut mock_generator = MockGenerator::new();
1544        let start_delay = Duration::from_secs(10);
1545        mock_generator
1546            .expect_payment_start()
1547            .return_once(move || Some(start_delay));
1548        let payment_interval = Duration::from_secs(5);
1549        mock_generator
1550            .expect_next_payment_wait()
1551            .returning(move || Ok(payment_interval));
1552
1553        assert_eq!(
1554            get_payment_delay(0, &node, &mock_generator).unwrap(),
1555            start_delay
1556        );
1557        assert_eq!(
1558            get_payment_delay(1, &node, &mock_generator).unwrap(),
1559            payment_interval
1560        );
1561    }
1562}