1use async_trait::async_trait;
2use bitcoin::secp256k1::PublicKey;
3use bitcoin::Network;
4use csv::WriterBuilder;
5use lightning::ln::features::NodeFeatures;
6use lightning::ln::PaymentHash;
7use rand::rngs::StdRng;
8use rand::{Rng, RngCore, SeedableRng};
9use rand_chacha::ChaCha8Rng;
10use random_activity::RandomActivityError;
11use serde::{Deserialize, Serialize};
12use std::collections::HashSet;
13use std::fmt::{Display, Formatter};
14use std::marker::Send;
15use std::path::PathBuf;
16use std::sync::Mutex as StdMutex;
17use std::time::{SystemTimeError, UNIX_EPOCH};
18use std::{collections::HashMap, sync::Arc, time::SystemTime};
19use thiserror::Error;
20use tokio::sync::mpsc::{channel, Receiver, Sender};
21use tokio::sync::Mutex;
22use tokio::task::JoinSet;
23use tokio::{select, time, time::Duration};
24use triggered::{Listener, Trigger};
25
26use self::defined_activity::DefinedPaymentActivity;
27use self::random_activity::{NetworkGraphView, RandomPaymentActivity};
28
29pub mod cln;
30mod defined_activity;
31pub mod lnd;
32mod random_activity;
33mod serializers;
34pub mod sim_node;
35#[cfg(test)]
36mod test_utils;
37
38#[derive(Serialize, Deserialize, Debug, Clone)]
39#[serde(untagged)]
40pub enum NodeConnection {
41 LND(lnd::LndConnection),
42 CLN(cln::ClnConnection),
43}
44
45#[derive(Serialize, Debug, Clone)]
46pub enum NodeId {
47 PublicKey(PublicKey),
48 Alias(String),
49}
50
51impl NodeId {
52 pub fn validate(&self, node_id: &PublicKey, alias: &mut String) -> Result<(), LightningError> {
53 match self {
54 crate::NodeId::PublicKey(pk) => {
55 if pk != node_id {
56 return Err(LightningError::ValidationError(format!(
57 "The provided node id does not match the one returned by the backend ({} != {}).",
58 pk, node_id
59 )));
60 }
61 },
62 crate::NodeId::Alias(a) => {
63 if alias != a {
64 log::warn!(
65 "The provided alias does not match the one returned by the backend ({} != {}).",
66 a,
67 alias
68 )
69 }
70 *alias = a.to_string();
71 },
72 }
73 Ok(())
74 }
75
76 pub fn get_pk(&self) -> Result<&PublicKey, String> {
77 if let NodeId::PublicKey(pk) = self {
78 Ok(pk)
79 } else {
80 Err("NodeId is not a PublicKey".to_string())
81 }
82 }
83}
84
85impl std::fmt::Display for NodeId {
86 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
87 write!(
88 f,
89 "{}",
90 match self {
91 NodeId::PublicKey(pk) => pk.to_string(),
92 NodeId::Alias(a) => a.to_owned(),
93 }
94 )
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
100pub struct ShortChannelID(u64);
101
102impl From<u64> for ShortChannelID {
104 fn from(value: u64) -> Self {
105 ShortChannelID(value)
106 }
107}
108
109impl From<ShortChannelID> for u64 {
111 fn from(scid: ShortChannelID) -> Self {
112 scid.0
113 }
114}
115
116impl std::fmt::Display for ShortChannelID {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 write!(
120 f,
121 "{}:{}:{}",
122 (self.0 >> 40) as u32,
123 ((self.0 >> 16) & 0xFFFFFF) as u32,
124 (self.0 & 0xFFFF) as u16,
125 )
126 }
127}
128
129#[derive(Debug, Serialize, Deserialize, Clone)]
130pub struct SimParams {
131 pub nodes: Vec<NodeConnection>,
132 #[serde(default)]
133 pub activity: Vec<ActivityParser>,
134}
135
136#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
138#[serde(untagged)]
139pub enum ValueOrRange<T> {
140 Value(T),
141 Range(T, T),
142}
143
144impl<T> ValueOrRange<T>
145where
146 T: std::cmp::PartialOrd + rand_distr::uniform::SampleUniform + Copy,
147{
148 pub fn value(&self) -> T {
150 match self {
151 ValueOrRange::Value(x) => *x,
152 ValueOrRange::Range(x, y) => {
153 let mut rng = rand::thread_rng();
154 rng.gen_range(*x..*y)
155 },
156 }
157 }
158}
159
160impl<T> Display for ValueOrRange<T>
161where
162 T: Display,
163{
164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165 match self {
166 ValueOrRange::Value(x) => write!(f, "{x}"),
167 ValueOrRange::Range(x, y) => write!(f, "({x}-{y})"),
168 }
169 }
170}
171
172type Amount = ValueOrRange<u64>;
174type Interval = ValueOrRange<u16>;
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct ActivityParser {
181 #[serde(with = "serializers::serde_node_id")]
183 pub source: NodeId,
184 #[serde(with = "serializers::serde_node_id")]
186 pub destination: NodeId,
187 pub start_secs: Option<u16>,
189 #[serde(default)]
191 pub count: Option<u64>,
192 #[serde(with = "serializers::serde_value_or_range")]
194 pub interval_secs: Interval,
195 #[serde(with = "serializers::serde_value_or_range")]
197 pub amount_msat: Amount,
198}
199
200#[derive(Debug, Clone)]
203pub struct ActivityDefinition {
204 pub source: NodeInfo,
206 pub destination: NodeInfo,
208 pub start_secs: Option<u16>,
210 pub count: Option<u64>,
212 pub interval_secs: Interval,
214 pub amount_msat: Amount,
216}
217
218#[derive(Debug, Error)]
219pub enum SimulationError {
220 #[error("Lightning Error: {0:?}")]
221 LightningError(#[from] LightningError),
222 #[error("TaskError")]
223 TaskError,
224 #[error("CSV Error: {0:?}")]
225 CsvError(#[from] csv::Error),
226 #[error("File Error")]
227 FileError,
228 #[error("{0}")]
229 RandomActivityError(RandomActivityError),
230 #[error("Simulated Network Error: {0}")]
231 SimulatedNetworkError(String),
232 #[error("System Time Error: {0}")]
233 SystemTimeError(#[from] SystemTimeError),
234 #[error("Missing Node Error: {0}")]
235 MissingNodeError(String),
236 #[error("Mpsc Channel Error: {0}")]
237 MpscChannelError(String),
238 #[error("Payment Generation Error: {0}")]
239 PaymentGenerationError(PaymentGenerationError),
240 #[error("Destination Generation Error: {0}")]
241 DestinationGenerationError(DestinationGenerationError),
242}
243
244#[derive(Debug, Error)]
245pub enum LightningError {
246 #[error("Node connection error: {0}")]
247 ConnectionError(String),
248 #[error("Get info error: {0}")]
249 GetInfoError(String),
250 #[error("Send payment error: {0}")]
251 SendPaymentError(String),
252 #[error("Track payment error: {0}")]
253 TrackPaymentError(String),
254 #[error("Invalid payment hash")]
255 InvalidPaymentHash,
256 #[error("Get node info error: {0}")]
257 GetNodeInfoError(String),
258 #[error("Config validation failed: {0}")]
259 ValidationError(String),
260 #[error("Permanent error: {0:?}")]
261 PermanentError(String),
262 #[error("List channels error: {0}")]
263 ListChannelsError(String),
264}
265
266#[derive(Debug, Clone)]
267pub struct NodeInfo {
268 pub pubkey: PublicKey,
269 pub alias: String,
270 pub features: NodeFeatures,
271}
272
273impl Display for NodeInfo {
274 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
275 let pk = self.pubkey.to_string();
276 let pk_summary = format!("{}...{}", &pk[..6], &pk[pk.len() - 6..]);
277 if self.alias.is_empty() {
278 write!(f, "{}", pk_summary)
279 } else {
280 write!(f, "{}({})", self.alias, pk_summary)
281 }
282 }
283}
284
285#[async_trait]
287pub trait LightningNode: Send {
288 fn get_info(&self) -> &NodeInfo;
290 async fn get_network(&mut self) -> Result<Network, LightningError>;
292 async fn send_payment(
294 &mut self,
295 dest: PublicKey,
296 amount_msat: u64,
297 ) -> Result<PaymentHash, LightningError>;
298 async fn track_payment(
300 &mut self,
301 hash: &PaymentHash,
302 shutdown: Listener,
303 ) -> Result<PaymentResult, LightningError>;
304 async fn get_node_info(&mut self, node_id: &PublicKey) -> Result<NodeInfo, LightningError>;
306 async fn list_channels(&mut self) -> Result<Vec<u64>, LightningError>;
309}
310
311#[derive(Debug, Error)]
312#[error("Destination generation error: {0}")]
313pub struct DestinationGenerationError(String);
314
315pub trait DestinationGenerator: Send {
316 fn choose_destination(
319 &self,
320 source: PublicKey,
321 ) -> Result<(NodeInfo, Option<u64>), DestinationGenerationError>;
322}
323
324#[derive(Debug, Error)]
325#[error("Payment generation error: {0}")]
326pub struct PaymentGenerationError(String);
327
328pub trait PaymentGenerator: Display + Send {
329 fn payment_start(&self) -> Option<Duration>;
331
332 fn payment_count(&self) -> Option<u64>;
334
335 fn next_payment_wait(&self) -> Result<time::Duration, PaymentGenerationError>;
337
338 fn payment_amount(
340 &self,
341 destination_capacity: Option<u64>,
342 ) -> Result<u64, PaymentGenerationError>;
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct PaymentResult {
347 pub htlc_count: usize,
348 pub payment_outcome: PaymentOutcome,
349}
350
351impl PaymentResult {
352 pub fn not_dispatched() -> Self {
353 PaymentResult {
354 htlc_count: 0,
355 payment_outcome: PaymentOutcome::NotDispatched,
356 }
357 }
358
359 pub fn track_payment_failed() -> Self {
360 PaymentResult {
361 htlc_count: 0,
362 payment_outcome: PaymentOutcome::TrackPaymentFailed,
363 }
364 }
365}
366
367impl Display for PaymentResult {
368 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
369 write!(
370 f,
371 "Payment outcome: {:?} with {} htlcs",
372 self.payment_outcome, self.htlc_count
373 )
374 }
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub enum PaymentOutcome {
379 Success,
380 RecipientRejected,
381 UserAbandoned,
382 RetriesExhausted,
383 PaymentExpired,
384 RouteNotFound,
385 UnexpectedError,
386 IncorrectPaymentDetails,
387 InsufficientBalance,
388 Unknown,
389 NotDispatched,
390 TrackPaymentFailed,
391}
392
393#[derive(Debug, Clone, Copy, Serialize)]
395struct Payment {
396 source: PublicKey,
398 destination: PublicKey,
400 amount_msat: u64,
402 #[serde(with = "serializers::serde_option_payment_hash")]
404 hash: Option<PaymentHash>,
405 #[serde(with = "serde_millis")]
407 dispatch_time: SystemTime,
408}
409
410impl Display for Payment {
411 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
412 let dispatch_time = self
413 .dispatch_time
414 .duration_since(UNIX_EPOCH)
415 .expect("Failed to compute duration since unix epoch.");
416
417 write!(
418 f,
419 "Payment {} dispatched at {:?} sending {} msat from {} -> {}.",
420 self.hash.map(|h| hex::encode(h.0)).unwrap_or_default(),
421 dispatch_time,
422 self.amount_msat,
423 self.source,
424 self.destination,
425 )
426 }
427}
428
429#[derive(Clone, Debug)]
432enum SimulationEvent {
433 SendPayment(NodeInfo, u64),
436}
437
438#[derive(Debug, Clone)]
440enum SimulationOutput {
441 SendPaymentSuccess(Payment),
444 SendPaymentFailure(Payment, PaymentResult),
447}
448
449type MutRngType = Arc<StdMutex<dyn RngCore + Send>>;
457
458#[derive(Clone)]
461struct MutRng(MutRngType);
462
463impl MutRng {
464 pub fn new(seed_opt: Option<u64>) -> Self {
469 if let Some(seed) = seed_opt {
470 Self(Arc::new(StdMutex::new(ChaCha8Rng::seed_from_u64(seed))))
471 } else {
472 Self(Arc::new(StdMutex::new(StdRng::from_entropy())))
473 }
474 }
475}
476
477#[derive(Clone)]
479pub struct SimulationCfg {
480 total_time: Option<time::Duration>,
482 expected_payment_msat: u64,
484 activity_multiplier: f64,
487 write_results: Option<WriteResults>,
489 seeded_rng: MutRng,
491 results: Arc<Mutex<PaymentResultLogger>>,
493}
494
495impl SimulationCfg {
496 pub fn new(
497 total_time: Option<u32>,
498 expected_payment_msat: u64,
499 activity_multiplier: f64,
500 write_results: Option<WriteResults>,
501 seed: Option<u64>,
502 ) -> Self {
503 Self {
504 total_time: total_time.map(|x| Duration::from_secs(x as u64)),
505 expected_payment_msat,
506 activity_multiplier,
507 write_results,
508 seeded_rng: MutRng::new(seed),
509 results: Arc::new(Mutex::new(PaymentResultLogger::new())),
510 }
511 }
512}
513
514#[derive(Clone)]
515pub struct Simulation {
516 cfg: SimulationCfg,
518 nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
520 activity: Vec<ActivityDefinition>,
522 shutdown_trigger: Trigger,
524 shutdown_listener: Listener,
525}
526
527#[derive(Clone)]
528pub struct WriteResults {
529 pub results_dir: PathBuf,
531 pub batch_size: u32,
533}
534
535struct ExecutorKit {
538 source_info: NodeInfo,
539 network_generator: Arc<Mutex<dyn DestinationGenerator>>,
542 payment_generator: Box<dyn PaymentGenerator>,
543}
544
545impl Simulation {
546 pub fn new(
547 cfg: SimulationCfg,
548 nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
549 activity: Vec<ActivityDefinition>,
550 ) -> Self {
551 let (shutdown_trigger, shutdown_listener) = triggered::trigger();
552 Self {
553 cfg,
554 nodes,
555 activity,
556 shutdown_trigger,
557 shutdown_listener,
558 }
559 }
560
561 async fn validate_activity(&self) -> Result<(), LightningError> {
565 if self.activity.is_empty() {
567 if self.nodes.len() <= 1 {
568 return Err(LightningError::ValidationError(
569 "At least two nodes required for random activity generation.".to_string(),
570 ));
571 } else {
572 for node in self.nodes.values() {
573 let node = node.lock().await;
574 if !node.get_info().features.supports_keysend() {
575 return Err(LightningError::ValidationError(format!(
576 "All nodes eligible for random activity generation must support keysend, {} does not",
577 node.get_info()
578 )));
579 }
580 }
581 }
582 }
583
584 for payment_flow in self.activity.iter() {
585 self.nodes
588 .get(&payment_flow.source.pubkey)
589 .ok_or(LightningError::ValidationError(format!(
590 "Source node not found, {}",
591 payment_flow.source,
592 )))?;
593
594 if !payment_flow.destination.features.supports_keysend() {
597 return Err(LightningError::ValidationError(format!(
598 "Destination node does not support keysend, {}",
599 payment_flow.destination,
600 )));
601 }
602 }
603
604 Ok(())
605 }
606
607 async fn validate_node_network(&self) -> Result<(), LightningError> {
609 if self.nodes.is_empty() {
610 return Err(LightningError::ValidationError(
611 "we don't control any nodes. Specify at least one node in your config file"
612 .to_string(),
613 ));
614 }
615 let mut running_network = Option::None;
616
617 for node in self.nodes.values() {
618 let network = node.lock().await.get_network().await?;
619 if network == Network::Bitcoin {
620 return Err(LightningError::ValidationError(
621 "mainnet is not supported".to_string(),
622 ));
623 }
624
625 running_network = running_network.take().or(Some(network));
626 if running_network != Some(network) {
627 return Err(LightningError::ValidationError(format!(
628 "nodes are not on the same network {}.",
629 network,
630 )));
631 }
632 }
633
634 log::info!(
635 "Simulation is running on {}.",
636 running_network.expect("Network not provided.")
637 );
638
639 Ok(())
640 }
641
642 pub async fn run(&self) -> Result<(), SimulationError> {
643 if let Some(total_time) = self.cfg.total_time {
644 log::info!("Running the simulation for {}s.", total_time.as_secs());
645 } else {
646 log::info!("Running the simulation forever.");
647 }
648
649 self.validate_node_network().await?;
650 self.validate_activity().await?;
651
652 log::info!(
653 "Simulating {} activity on {} nodes.",
654 self.activity.len(),
655 self.nodes.len()
656 );
657 let mut tasks = JoinSet::new();
658
659 let (event_sender, event_receiver) = channel(1);
666 self.run_data_collection(event_receiver, &mut tasks);
667
668 let activities = match self.activity_executors().await {
670 Ok(a) => a,
671 Err(e) => {
672 self.shutdown();
676 while let Some(res) = tasks.join_next().await {
677 if let Err(e) = res {
678 log::error!("Task exited with error: {e}.");
679 }
680 }
681 return Err(e);
682 },
683 };
684 let consumer_channels = self.dispatch_consumers(
685 activities
686 .iter()
687 .map(|generator| generator.source_info.pubkey)
688 .collect(),
689 event_sender.clone(),
690 &mut tasks,
691 );
692
693 let mut producer_tasks = JoinSet::new();
696 match self
697 .dispatch_producers(activities, consumer_channels, &mut producer_tasks)
698 .await
699 {
700 Ok(_) => {},
701 Err(e) => {
702 self.shutdown();
706 while let Some(res) = tasks.join_next().await {
707 if let Err(e) = res {
708 log::error!("Task exited with error: {e}.");
709 }
710 }
711 return Err(e);
712 },
713 }
714
715 let producer_trigger = self.shutdown_trigger.clone();
718 tasks.spawn(async move {
719 while let Some(res) = producer_tasks.join_next().await {
720 if let Err(e) = res {
721 log::error!("Producer exited with error: {e}.");
722 }
723 }
724 log::info!("All producers finished. Shutting down.");
725 producer_trigger.trigger()
726 });
727
728 if let Some(total_time) = self.cfg.total_time {
730 let t = self.shutdown_trigger.clone();
731 let l = self.shutdown_listener.clone();
732
733 tasks.spawn(async move {
734 if time::timeout(total_time, l).await.is_err() {
735 log::info!(
736 "Simulation run for {}s. Shutting down.",
737 total_time.as_secs()
738 );
739 t.trigger()
740 }
741 });
742 }
743
744 let mut success = true;
748 while let Some(res) = tasks.join_next().await {
749 if let Err(e) = res {
750 log::error!("Task exited with error: {e}.");
751 success = false;
752 }
753 }
754
755 success.then_some(()).ok_or(SimulationError::TaskError)
756 }
757
758 pub fn shutdown(&self) {
759 self.shutdown_trigger.trigger()
760 }
761
762 pub async fn get_total_payments(&self) -> u64 {
763 self.cfg.results.lock().await.total_attempts()
764 }
765
766 pub async fn get_success_rate(&self) -> f64 {
767 self.cfg.results.lock().await.success_rate()
768 }
769
770 fn run_data_collection(
773 &self,
774 output_receiver: Receiver<SimulationOutput>,
775 tasks: &mut JoinSet<()>,
776 ) {
777 let listener = self.shutdown_listener.clone();
778 let shutdown = self.shutdown_trigger.clone();
779 log::debug!("Setting up simulator data collection.");
780
781 let (results_sender, results_receiver) = channel(1);
783
784 let nodes = self.nodes.clone();
785 let psr_listener = listener.clone();
787 let psr_shutdown = shutdown.clone();
788 tasks.spawn(async move {
789 log::debug!("Starting simulation results producer.");
790 if let Err(e) =
791 produce_simulation_results(nodes, output_receiver, results_sender, psr_listener)
792 .await
793 {
794 psr_shutdown.trigger();
795 log::error!("Produce simulation results exited with error: {e:?}.");
796 } else {
797 log::debug!("Produce simulation results received shutdown signal.");
798 }
799 });
800
801 let result_logger = self.cfg.results.clone();
802
803 let result_logger_clone = result_logger.clone();
804 let result_logger_listener = listener.clone();
805 tasks.spawn(async move {
806 log::debug!("Starting results logger.");
807 run_results_logger(
808 result_logger_listener,
809 result_logger_clone,
810 Duration::from_secs(60),
811 )
812 .await;
813 log::debug!("Exiting results logger.");
814 });
815
816 let csr_write_results = self.cfg.write_results.clone();
818 tasks.spawn(async move {
819 log::debug!("Starting simulation results consumer.");
820 if let Err(e) = consume_simulation_results(
821 result_logger,
822 results_receiver,
823 listener,
824 csr_write_results,
825 )
826 .await
827 {
828 shutdown.trigger();
829 log::error!("Consume simulation results exited with error: {e:?}.");
830 } else {
831 log::debug!("Consume simulation result received shutdown signal.");
832 }
833 });
834
835 log::debug!("Simulator data collection set up.");
836 }
837
838 async fn activity_executors(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
839 let mut generators = Vec::new();
840
841 if !self.activity.is_empty() {
844 for description in self.activity.iter() {
845 let activity_generator = DefinedPaymentActivity::new(
846 description.destination.clone(),
847 description
848 .start_secs
849 .map(|start| Duration::from_secs(start.into())),
850 description.count,
851 description.interval_secs,
852 description.amount_msat,
853 );
854
855 generators.push(ExecutorKit {
856 source_info: description.source.clone(),
857 network_generator: Arc::new(Mutex::new(activity_generator.clone())),
860 payment_generator: Box::new(activity_generator),
861 });
862 }
863 } else {
864 generators = self.random_activity_nodes().await?;
865 }
866
867 Ok(generators)
868 }
869
870 async fn random_activity_nodes(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
873 let mut generators = Vec::new();
876 let mut active_nodes = HashMap::new();
877
878 for (pk, node) in self.nodes.iter() {
882 let chan_capacity = node.lock().await.list_channels().await?.iter().sum::<u64>();
883
884 if let Err(e) = RandomPaymentActivity::validate_capacity(
885 chan_capacity,
886 self.cfg.expected_payment_msat,
887 ) {
888 log::warn!("Node: {} not eligible for activity generation: {e}.", *pk);
889 continue;
890 }
891
892 let capacity = chan_capacity / 2;
895 let node_info = node.lock().await.get_node_info(pk).await?;
896 active_nodes.insert(node_info.pubkey, (node_info, capacity));
897 }
898
899 let network_generator = Arc::new(Mutex::new(
900 NetworkGraphView::new(
901 active_nodes.values().cloned().collect(),
902 self.cfg.seeded_rng.clone(),
903 )
904 .map_err(SimulationError::RandomActivityError)?,
905 ));
906
907 log::info!(
908 "Created network generator: {}.",
909 network_generator.lock().await
910 );
911
912 for (node_info, capacity) in active_nodes.values() {
913 generators.push(ExecutorKit {
914 source_info: node_info.clone(),
915 network_generator: network_generator.clone(),
916 payment_generator: Box::new(
917 RandomPaymentActivity::new(
918 *capacity,
919 self.cfg.expected_payment_msat,
920 self.cfg.activity_multiplier,
921 self.cfg.seeded_rng.clone(),
922 )
923 .map_err(SimulationError::RandomActivityError)?,
924 ),
925 });
926 }
927
928 Ok(generators)
929 }
930
931 fn dispatch_consumers(
934 &self,
935 consuming_nodes: HashSet<PublicKey>,
936 output_sender: Sender<SimulationOutput>,
937 tasks: &mut JoinSet<()>,
938 ) -> HashMap<PublicKey, Sender<SimulationEvent>> {
939 let mut channels = HashMap::new();
940
941 for (id, node) in self
942 .nodes
943 .iter()
944 .filter(|(id, _)| consuming_nodes.contains(id))
945 {
946 let (sender, receiver) = channel(1);
950 channels.insert(*id, sender.clone());
951
952 let ce_listener = self.shutdown_listener.clone();
956 let ce_shutdown = self.shutdown_trigger.clone();
957 let ce_output_sender = output_sender.clone();
958 let ce_node = node.clone();
959 tasks.spawn(async move {
960 let node_info = ce_node.lock().await.get_info().clone();
961 log::debug!("Starting events consumer for {}.", node_info);
962 if let Err(e) =
963 consume_events(ce_node, receiver, ce_output_sender, ce_listener).await
964 {
965 ce_shutdown.trigger();
966 log::error!("Event consumer for node {node_info} exited with error: {e:?}.");
967 } else {
968 log::debug!("Event consumer for node {node_info} completed successfully.");
969 }
970 });
971 }
972
973 channels
974 }
975
976 async fn dispatch_producers(
979 &self,
980 executors: Vec<ExecutorKit>,
981 producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
982 tasks: &mut JoinSet<()>,
983 ) -> Result<(), SimulationError> {
984 for executor in executors {
985 let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
986 SimulationError::RandomActivityError(RandomActivityError::ValueError(format!(
987 "Activity producer for: {} not found.",
988 executor.source_info.pubkey,
989 ))),
990 )?;
991
992 let pe_shutdown = self.shutdown_trigger.clone();
994 let pe_listener = self.shutdown_listener.clone();
995 let pe_sender = sender.clone();
996 tasks.spawn(async move {
997 let source = executor.source_info.clone();
998
999 log::info!(
1000 "Starting activity producer for {}: {}.",
1001 source,
1002 executor.payment_generator
1003 );
1004
1005 if let Err(e) = produce_events(
1006 executor.source_info,
1007 executor.network_generator,
1008 executor.payment_generator,
1009 pe_sender,
1010 pe_listener,
1011 )
1012 .await
1013 {
1014 pe_shutdown.trigger();
1015 log::debug!("Activity producer for {source} exited with error {e}.");
1016 } else {
1017 log::debug!("Activity producer for {source} completed successfully.");
1018 }
1019 });
1020 }
1021
1022 Ok(())
1023 }
1024}
1025
1026async fn consume_events(
1029 node: Arc<Mutex<dyn LightningNode>>,
1030 mut receiver: Receiver<SimulationEvent>,
1031 sender: Sender<SimulationOutput>,
1032 listener: Listener,
1033) -> Result<(), SimulationError> {
1034 loop {
1035 select! {
1036 biased;
1037 _ = listener.clone() => {
1038 return Ok(());
1039 },
1040 simulation_event = receiver.recv() => {
1041 if let Some(event) = simulation_event {
1042 match event {
1043 SimulationEvent::SendPayment(dest, amt_msat) => {
1044 let mut node = node.lock().await;
1045
1046 let mut payment = Payment {
1047 source: node.get_info().pubkey,
1048 hash: None,
1049 amount_msat: amt_msat,
1050 destination: dest.pubkey,
1051 dispatch_time: SystemTime::now(),
1052 };
1053
1054 let outcome = match node.send_payment(dest.pubkey, amt_msat).await {
1055 Ok(payment_hash) => {
1056 log::debug!(
1057 "Send payment: {} -> {}: ({}).",
1058 node.get_info(),
1059 dest,
1060 hex::encode(payment_hash.0)
1061 );
1062 payment.hash = Some(payment_hash);
1064 SimulationOutput::SendPaymentSuccess(payment)
1065 }
1066 Err(e) => {
1067 log::error!(
1068 "Error while sending payment {} -> {}.",
1069 node.get_info(),
1070 dest
1071 );
1072
1073 match e {
1074 LightningError::PermanentError(s) => {
1075 return Err(SimulationError::LightningError(LightningError::PermanentError(s)));
1076 }
1077 _ => SimulationOutput::SendPaymentFailure(
1078 payment,
1079 PaymentResult::not_dispatched(),
1080 ),
1081 }
1082 }
1083 };
1084
1085 select!{
1086 biased;
1087 _ = listener.clone() => {
1088 return Ok(())
1089 }
1090 send_result = sender.send(outcome.clone()) => {
1091 if send_result.is_err() {
1092 return Err(SimulationError::MpscChannelError(
1093 format!("Error sending simulation output {outcome:?}.")));
1094 }
1095 }
1096 }
1097 }
1098 }
1099 } else {
1100 return Ok(())
1101 }
1102 }
1103 }
1104 }
1105}
1106
1107async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator + ?Sized>(
1110 source: NodeInfo,
1111 network_generator: Arc<Mutex<N>>,
1112 node_generator: Box<A>,
1113 sender: Sender<SimulationEvent>,
1114 listener: Listener,
1115) -> Result<(), SimulationError> {
1116 let mut current_count = 0;
1117 loop {
1118 if let Some(c) = node_generator.payment_count() {
1119 if c == current_count {
1120 log::info!(
1121 "Payment count has been met for {source}: {c} payments. Stopping the activity."
1122 );
1123 return Ok(());
1124 }
1125 }
1126
1127 let wait = get_payment_delay(current_count, &source, node_generator.as_ref())?;
1128
1129 select! {
1130 biased;
1131 _ = listener.clone() => {
1132 return Ok(());
1133 },
1134 _ = time::sleep(wait) => {
1137 let (destination, capacity) = network_generator.lock().await.choose_destination(source.pubkey).map_err(SimulationError::DestinationGenerationError)?;
1138
1139 let amount = match node_generator.payment_amount(capacity) {
1143 Ok(amt) => {
1144 if amt == 0 {
1145 log::debug!("Skipping zero amount payment for {source} -> {destination}.");
1146 continue;
1147 }
1148 amt
1149 },
1150 Err(e) => {
1151 return Err(SimulationError::PaymentGenerationError(e));
1152 },
1153 };
1154
1155 log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);
1156
1157 let event = SimulationEvent::SendPayment(destination.clone(), amount);
1159 select!{
1160 biased;
1161 _ = listener.clone() => {
1162 return Ok(());
1163 },
1164 send_result = sender.send(event.clone()) => {
1165 if send_result.is_err(){
1166 return Err(SimulationError::MpscChannelError(
1167 format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
1168 }
1169 },
1170 }
1171
1172 current_count += 1;
1173 },
1174 }
1175 }
1176}
1177
1178fn get_payment_delay<A: PaymentGenerator + ?Sized>(
1181 call_count: u64,
1182 source: &NodeInfo,
1183 node_generator: &A,
1184) -> Result<Duration, SimulationError> {
1185 if call_count != 0 {
1190 let wait = node_generator
1191 .next_payment_wait()
1192 .map_err(SimulationError::PaymentGenerationError)?;
1193 log::debug!("Next payment for {source} in {:?}.", wait);
1194 Ok(wait)
1195 } else if let Some(start) = node_generator.payment_start() {
1196 log::debug!(
1197 "First payment for {source} will be after a start delay of {:?}.",
1198 start
1199 );
1200 Ok(start)
1201 } else {
1202 let wait = node_generator
1203 .next_payment_wait()
1204 .map_err(SimulationError::PaymentGenerationError)?;
1205 log::debug!("First payment for {source} in {:?}.", wait);
1206 Ok(wait)
1207 }
1208}
1209
1210async fn consume_simulation_results(
1211 logger: Arc<Mutex<PaymentResultLogger>>,
1212 mut receiver: Receiver<(Payment, PaymentResult)>,
1213 listener: Listener,
1214 write_results: Option<WriteResults>,
1215) -> Result<(), SimulationError> {
1216 let mut writer = match write_results {
1217 Some(res) => {
1218 let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
1219 let file = res
1220 .results_dir
1221 .join(format!("simulation_{:?}.csv", duration));
1222 let writer = WriterBuilder::new().from_path(file)?;
1223 Some((writer, res.batch_size))
1224 },
1225 None => None,
1226 };
1227
1228 let mut counter = 1;
1229
1230 loop {
1231 select! {
1232 biased;
1233 _ = listener.clone() => {
1234 writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| {
1235 SimulationError::FileError
1236 }))?;
1237 return Ok(());
1238 },
1239 payment_result = receiver.recv() => {
1240 match payment_result {
1241 Some((details, result)) => {
1242 logger.lock().await.report_result(&details, &result);
1243 log::trace!("Resolved dispatched payment: {} with: {}.", details, result);
1244
1245 if let Some((ref mut w, batch_size)) = writer {
1246 w.serialize((details, result)).map_err(|e| {
1247 let _ = w.flush();
1248 SimulationError::CsvError(e)
1249 })?;
1250 counter = counter % batch_size + 1;
1251 if batch_size == counter {
1252 w.flush().map_err(|_| {
1253 SimulationError::FileError
1254 })?;
1255 }
1256 }
1257 },
1258 None => return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError)),
1259 }
1260 }
1261 }
1262 }
1263}
1264
1265#[derive(Default)]
1267struct PaymentResultLogger {
1268 success_payment: u64,
1269 failed_payment: u64,
1270 total_sent: u64,
1271}
1272
1273impl PaymentResultLogger {
1274 fn new() -> Self {
1275 PaymentResultLogger {
1276 ..Default::default()
1277 }
1278 }
1279
1280 fn report_result(&mut self, details: &Payment, result: &PaymentResult) {
1281 match result.payment_outcome {
1282 PaymentOutcome::Success => self.success_payment += 1,
1283 _ => self.failed_payment += 1,
1284 }
1285
1286 self.total_sent += details.amount_msat;
1287 }
1288
1289 fn total_attempts(&self) -> u64 {
1290 self.success_payment + self.failed_payment
1291 }
1292
1293 fn success_rate(&self) -> f64 {
1294 (self.success_payment as f64 / self.total_attempts() as f64) * 100.0
1295 }
1296}
1297
1298impl Display for PaymentResultLogger {
1299 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1300 write!(
1301 f,
1302 "Processed {} payments sending {} msat total with {:.2}% success rate.",
1303 self.total_attempts(),
1304 self.total_sent,
1305 self.success_rate()
1306 )
1307 }
1308}
1309
1310async fn run_results_logger(
1314 listener: Listener,
1315 logger: Arc<Mutex<PaymentResultLogger>>,
1316 interval: Duration,
1317) {
1318 log::info!("Summary of results will be reported every {:?}.", interval);
1319
1320 loop {
1321 select! {
1322 biased;
1323 _ = listener.clone() => {
1324 break
1325 }
1326
1327 _ = time::sleep(interval) => {
1328 log::info!("{}", logger.lock().await)
1329 }
1330 }
1331 }
1332}
1333
1334async fn produce_simulation_results(
1344 nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
1345 mut output_receiver: Receiver<SimulationOutput>,
1346 results: Sender<(Payment, PaymentResult)>,
1347 listener: Listener,
1348) -> Result<(), SimulationError> {
1349 let mut set = tokio::task::JoinSet::new();
1350
1351 let result = loop {
1352 tokio::select! {
1353 biased;
1354 _ = listener.clone() => {
1355 break Ok(())
1356 },
1357 output = output_receiver.recv() => {
1358 match output {
1359 Some(simulation_output) => {
1360 match simulation_output{
1361 SimulationOutput::SendPaymentSuccess(payment) => {
1362 if let Some(source_node) = nodes.get(&payment.source) {
1363 set.spawn(track_payment_result(
1364 source_node.clone(), results.clone(), payment, listener.clone()
1365 ));
1366 } else {
1367 break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
1368 }
1369 },
1370 SimulationOutput::SendPaymentFailure(payment, result) => {
1371 select!{
1372 _ = listener.clone() => {
1373 return Ok(());
1374 },
1375 send_result = results.send((payment, result.clone())) => {
1376 if send_result.is_err(){
1377 break Err(SimulationError::MpscChannelError(
1378 format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.",
1379 payment.hash, payment.dispatch_time),
1380 ));
1381 }
1382 },
1383 }
1384 }
1385 };
1386 },
1387 None => break Ok(())
1388 }
1389 }
1390 }
1391 };
1392
1393 log::debug!("Simulation results producer exiting.");
1394 while let Some(res) = set.join_next().await {
1395 if let Err(e) = res {
1396 log::error!("Simulation results producer task exited with error: {e}.");
1397 }
1398 }
1399
1400 result
1401}
1402
1403async fn track_payment_result(
1404 node: Arc<Mutex<dyn LightningNode>>,
1405 results: Sender<(Payment, PaymentResult)>,
1406 payment: Payment,
1407 listener: Listener,
1408) -> Result<(), SimulationError> {
1409 log::trace!("Payment result tracker starting.");
1410
1411 let mut node = node.lock().await;
1412
1413 let res = match payment.hash {
1414 Some(hash) => {
1415 log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
1416 let track_payment = node.track_payment(&hash, listener.clone());
1417
1418 match track_payment.await {
1419 Ok(res) => {
1420 log::debug!(
1421 "Track payment {} result: {:?}.",
1422 hex::encode(hash.0),
1423 res.payment_outcome
1424 );
1425 res
1426 },
1427 Err(e) => {
1428 log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0));
1429 PaymentResult::track_payment_failed()
1430 },
1431 }
1432 },
1433 None => {
1435 log::error!(
1436 "We cannot track a payment that has not been dispatched. Missing payment hash."
1437 );
1438 PaymentResult::not_dispatched()
1439 },
1440 };
1441
1442 select! {
1443 biased;
1444 _ = listener.clone() => {
1445 log::debug!("Track payment result received a shutdown signal.");
1446 },
1447 send_payment_result = results.send((payment, res.clone())) => {
1448 if send_payment_result.is_err() {
1449 return Err(SimulationError::MpscChannelError(
1450 format!("Failed to send payment result {res} for payment {payment}.")))
1451 }
1452 }
1453 }
1454
1455 log::trace!("Result tracking complete. Payment result tracker exiting.");
1456
1457 Ok(())
1458}
1459
1460#[cfg(test)]
1461mod tests {
1462 use crate::{get_payment_delay, test_utils, MutRng, PaymentGenerationError, PaymentGenerator};
1463 use mockall::mock;
1464 use std::fmt;
1465 use std::time::Duration;
1466
1467 #[test]
1468 fn create_seeded_mut_rng() {
1469 let seeds = vec![u64::MIN, u64::MAX];
1470
1471 for seed in seeds {
1472 let mut_rng_1 = MutRng::new(Some(seed));
1473 let mut_rng_2 = MutRng::new(Some(seed));
1474
1475 let mut rng_1 = mut_rng_1.0.lock().unwrap();
1476 let mut rng_2 = mut_rng_2.0.lock().unwrap();
1477
1478 assert_eq!(rng_1.next_u64(), rng_2.next_u64())
1479 }
1480 }
1481
1482 #[test]
1483 fn create_unseeded_mut_rng() {
1484 let mut_rng_1 = MutRng::new(None);
1485 let mut_rng_2 = MutRng::new(None);
1486
1487 let mut rng_1 = mut_rng_1.0.lock().unwrap();
1488 let mut rng_2 = mut_rng_2.0.lock().unwrap();
1489
1490 assert_ne!(rng_1.next_u64(), rng_2.next_u64())
1491 }
1492
1493 mock! {
1494 pub Generator {}
1495
1496 impl fmt::Display for Generator {
1497 fn fmt<'a>(&self, f: &mut fmt::Formatter<'a>) -> fmt::Result;
1498 }
1499
1500 impl PaymentGenerator for Generator {
1501 fn payment_start(&self) -> Option<Duration>;
1502 fn payment_count(&self) -> Option<u64>;
1503 fn next_payment_wait(&self) -> Result<Duration, PaymentGenerationError>;
1504 fn payment_amount(&self, destination_capacity: Option<u64>) -> Result<u64, PaymentGenerationError>;
1505 }
1506 }
1507
1508 #[test]
1509 fn test_no_payment_delay() {
1510 let node = test_utils::create_nodes(1, 100_000)
1511 .first()
1512 .unwrap()
1513 .0
1514 .clone();
1515
1516 let mut mock_generator = MockGenerator::new();
1518 mock_generator.expect_payment_start().return_once(|| None);
1519 let payment_interval = Duration::from_secs(5);
1520 mock_generator
1521 .expect_next_payment_wait()
1522 .returning(move || Ok(payment_interval));
1523
1524 assert_eq!(
1525 get_payment_delay(0, &node, &mock_generator).unwrap(),
1526 payment_interval
1527 );
1528 assert_eq!(
1529 get_payment_delay(1, &node, &mock_generator).unwrap(),
1530 payment_interval
1531 );
1532 }
1533
1534 #[test]
1535 fn test_payment_delay() {
1536 let node = test_utils::create_nodes(1, 100_000)
1537 .first()
1538 .unwrap()
1539 .0
1540 .clone();
1541
1542 let mut mock_generator = MockGenerator::new();
1544 let start_delay = Duration::from_secs(10);
1545 mock_generator
1546 .expect_payment_start()
1547 .return_once(move || Some(start_delay));
1548 let payment_interval = Duration::from_secs(5);
1549 mock_generator
1550 .expect_next_payment_wait()
1551 .returning(move || Ok(payment_interval));
1552
1553 assert_eq!(
1554 get_payment_delay(0, &node, &mock_generator).unwrap(),
1555 start_delay
1556 );
1557 assert_eq!(
1558 get_payment_delay(1, &node, &mock_generator).unwrap(),
1559 payment_interval
1560 );
1561 }
1562}