use crate::error::MutinyError;
use crate::nodemanager::NodeManager;
use crate::storage::MutinyStorage;
use crate::utils;
use crate::utils::sleep;
use anyhow::anyhow;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use bitcoin::{Address, OutPoint};
use lightning::{log_debug, log_error, log_info};
use lightning::{log_warn, util::logger::Logger};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::Ordering;
const NEW_NODE_SLEEP_DURATION: i32 = 5_000;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum RedshiftStatus {
ChannelOpening,
ChannelOpened,
AttemptingPayments,
ClosingChannels,
Completed,
Failed(String),
}
impl RedshiftStatus {
pub fn is_in_progress(&self) -> bool {
match self {
RedshiftStatus::ChannelOpening => true,
RedshiftStatus::ChannelOpened => true,
RedshiftStatus::AttemptingPayments => true,
RedshiftStatus::ClosingChannels => true,
RedshiftStatus::Completed => false,
RedshiftStatus::Failed(_) => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum RedshiftRecipient {
Lightning(PublicKey),
OnChain(Option<Address>),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Redshift {
pub id: [u8; 16],
pub input_utxo: OutPoint,
pub status: RedshiftStatus,
pub sending_node: PublicKey,
pub recipient: RedshiftRecipient,
pub receiving_node: Option<PublicKey>,
pub output_utxo: Option<OutPoint>,
pub introduction_channel: Option<OutPoint>,
pub output_channel: Option<Vec<OutPoint>>,
pub introduction_node: PublicKey,
pub amount_sats: u64,
pub sats_sent: u64,
pub change_amt: Option<u64>,
pub fees_paid: u64,
}
impl Redshift {
pub fn channel_opened(&mut self, chan_id: OutPoint) {
self.introduction_channel = Some(chan_id);
self.status = RedshiftStatus::ChannelOpened;
}
pub fn payment_successful(&mut self, amount: u64, fees_paid: u64) {
self.sats_sent += amount;
self.fees_paid += fees_paid;
}
pub fn fail(&mut self, error: String) {
self.status = RedshiftStatus::Failed(error);
}
}
pub trait RedshiftStorage {
fn get_redshift(&self, utxo: &[u8; 16]) -> Result<Option<Redshift>, MutinyError>;
fn get_redshifts(&self) -> Result<Vec<Redshift>, MutinyError>;
fn persist_redshift(&self, redshift: Redshift) -> Result<(), MutinyError>;
}
const REDSHIFT_KEY_PREFIX: &str = "redshift/";
fn get_redshift_key(id: &[u8; 16]) -> String {
format!("{REDSHIFT_KEY_PREFIX}{}", id.to_hex())
}
impl<S: MutinyStorage> RedshiftStorage for S {
fn get_redshift(&self, id: &[u8; 16]) -> Result<Option<Redshift>, MutinyError> {
let redshifts = self.get_data(get_redshift_key(id))?;
Ok(redshifts)
}
fn get_redshifts(&self) -> Result<Vec<Redshift>, MutinyError> {
let map: HashMap<String, Redshift> = self.scan(REDSHIFT_KEY_PREFIX, None)?;
Ok(map.values().map(|v| v.to_owned()).collect())
}
fn persist_redshift(&self, redshift: Redshift) -> Result<(), MutinyError> {
self.set_data(get_redshift_key(&redshift.id), redshift, None)
}
}
pub trait RedshiftManager {
async fn init_redshift(
&self,
utxo: OutPoint,
recipient: RedshiftRecipient,
introduction_node: Option<PublicKey>,
connection_string: Option<&str>,
) -> Result<Redshift, MutinyError>;
fn get_redshift(&self, id: &[u8; 16]) -> Result<Option<Redshift>, MutinyError>;
async fn attempt_payments(&self, rs: Redshift) -> Result<(), MutinyError>;
async fn close_channels(&self, rs: Redshift) -> Result<(), MutinyError>;
}
impl<S: MutinyStorage> RedshiftManager for NodeManager<S> {
async fn init_redshift(
&self,
utxo: OutPoint,
recipient: RedshiftRecipient,
introduction_node: Option<PublicKey>,
connection_string: Option<&str>,
) -> Result<Redshift, MutinyError> {
let utxos = self.list_utxos()?;
let u = utxos
.iter()
.find(|u| u.outpoint == utxo)
.ok_or_else(|| MutinyError::Other(anyhow!("Could not find UTXO")))?;
let node = self.new_node().await?;
sleep(NEW_NODE_SLEEP_DURATION).await;
let introduction_node = match (introduction_node, connection_string) {
(Some(i), Some(c)) => {
let connect_string = format!("{i}@{c}");
self.connect_to_peer(&node.pubkey, &connect_string, None)
.await?;
i
}
(Some(i), None) => {
let node = self.get_node(&node.pubkey).await?;
if node.peer_manager.get_peer_node_ids().contains(&i) {
i
} else {
return Err(MutinyError::Other(anyhow!(
"Could not connect to introduction node"
)));
}
}
_ => {
let node = self.get_node(&node.pubkey).await?;
match &node.lsp_client {
Some(lsp) => lsp.pubkey,
None => return Err(MutinyError::LspGenericError),
}
}
};
let mut user_channel_id_bytes = [0u8; 16];
getrandom::getrandom(&mut user_channel_id_bytes)
.map_err(|_| MutinyError::Other(anyhow!("Failed to generate user channel id")))?;
let user_chan_id = u128::from_be_bytes(user_channel_id_bytes);
let channel = self
.sweep_utxos_to_channel(
Some(user_chan_id),
&node.pubkey,
&[utxo],
Some(introduction_node),
)
.await?;
let fees = u.txout.value - channel.size;
let redshift = Redshift {
id: user_chan_id.to_be_bytes(),
input_utxo: utxo,
status: RedshiftStatus::ChannelOpening,
sending_node: node.pubkey,
recipient,
receiving_node: None,
output_utxo: None,
introduction_channel: channel.outpoint,
output_channel: None,
introduction_node,
amount_sats: u.txout.value,
sats_sent: 0,
change_amt: None,
fees_paid: fees,
};
self.storage.persist_redshift(redshift.clone())?;
Ok(redshift)
}
fn get_redshift(&self, id: &[u8; 16]) -> Result<Option<Redshift>, MutinyError> {
self.storage.get_redshift(id)
}
async fn attempt_payments(&self, mut rs: Redshift) -> Result<(), MutinyError> {
log_info!(
&self.logger,
"Attempting payments for redshift {}",
rs.id.to_hex()
);
let sending_node = self.get_node(&rs.sending_node).await?;
let reserve = match rs.introduction_channel {
None => 0,
Some(chan) => sending_node
.channel_manager
.list_channels()
.iter()
.find_map(|c| {
if c.funding_txo.map(|u| u.into_bitcoin_outpoint()) == Some(chan) {
c.unspendable_punishment_reserve
} else {
None
}
})
.unwrap_or(0),
};
let max_sats = rs.amount_sats - rs.fees_paid - reserve;
let min_sats = utils::min_lightning_amount(self.get_network());
let receiving_node = match rs.recipient {
RedshiftRecipient::Lightning(receiving_pubkey) => {
self.get_node(&receiving_pubkey).await?
}
RedshiftRecipient::OnChain(_) => {
let new_receiving_node = self.new_node().await?.pubkey;
sleep(NEW_NODE_SLEEP_DURATION).await;
self.get_node(&new_receiving_node).await?
}
};
rs.receiving_node = Some(receiving_node.pubkey);
self.storage.persist_redshift(rs.clone())?;
let mut local_max_sats = max_sats;
let get_invoice_failures = 0;
loop {
if self.stop.load(Ordering::Relaxed) {
break;
}
log_debug!(
&self.logger,
"Looping through payments for redshift {}: sats={}",
rs.id.to_hex(),
local_max_sats,
);
if local_max_sats < min_sats {
log_debug!(
&self.logger,
"Local max amount is less than min for redshift {}: sats={}",
rs.id.to_hex(),
local_max_sats,
);
if rs.sats_sent == 0 {
log_error!(
&self.logger,
"No payments were made for redshift {}: sats={}",
rs.id.to_hex(),
local_max_sats,
);
rs.fail("no payments were made".to_string());
} else {
rs.status = RedshiftStatus::ClosingChannels;
}
break;
}
log_debug!(
&self.logger,
"Getting an invoice for redshift {}: sats={}",
rs.id.to_hex(),
local_max_sats,
);
let invoice = match receiving_node
.create_invoice(Some(local_max_sats), vec!["Redshift".to_string()], None)
.await
{
Ok(i) => i,
Err(_) => {
if get_invoice_failures > 3 {
break;
}
log_debug!(
&self.logger,
"Could not get an invoice, trying again for redshift {}",
rs.id.to_hex(),
);
sleep(1000).await;
continue;
}
};
log_debug!(
&self.logger,
"created invoice: {}",
invoice.payment_hash().to_hex()
);
let label = format!("Redshift: {}", rs.id.to_hex());
match sending_node
.pay_invoice_with_timeout(&invoice, None, None, vec![label])
.await
{
Ok(i) => {
if i.paid {
let amount_sent = i.amount_sats.expect("invoice must have amount");
log_debug!(
&self.logger,
"successfully paid the redshift invoice {amount_sent} sats"
);
rs.payment_successful(amount_sent, i.fees_paid.unwrap_or(0));
if rs.sats_sent >= max_sats {
rs.status = RedshiftStatus::ClosingChannels;
break;
}
self.storage.persist_redshift(rs.clone())?;
local_max_sats = max_sats.saturating_sub(rs.sats_sent);
} else {
log_debug!(&self.logger, "payment still pending...");
}
}
Err(e) => {
log_error!(&self.logger, "could not pay: {e}");
let decrement = (max_sats as f64 * 0.05) as u64;
local_max_sats = local_max_sats.saturating_sub(decrement);
}
}
}
log_debug!(
&self.logger,
"Redshift {} completed with status: {:?}",
rs.id.to_hex(),
rs.status
);
self.storage.persist_redshift(rs.clone())?;
self.close_channels(rs).await?;
Ok(())
}
async fn close_channels(&self, mut rs: Redshift) -> Result<(), MutinyError> {
match rs.introduction_channel.as_ref() {
Some(chan) => {
self.close_channel(chan, false, false).await?
}
None => log_debug!(&self.logger, "no introduction channel to close"),
}
match &rs.recipient {
RedshiftRecipient::Lightning(_) => {} RedshiftRecipient::OnChain(_addr) => {
let receiving_node = match &rs.receiving_node {
None => {
log_error!(
&self.logger,
"no receiving node for redshift {}, cannot close channels",
rs.id.to_hex()
);
return Err(MutinyError::Other(anyhow!(
"No receiving node for on-chain redshift"
)));
}
Some(node) => self.get_node(node).await?,
};
let mut channel_outpoints: Vec<OutPoint> = vec![];
for c in receiving_node.channel_manager.list_channels() {
if let Some(funding_txo) = c.funding_txo {
let channel_outpoint = funding_txo.into_bitcoin_outpoint();
self.close_channel(&channel_outpoint, false, false).await?;
channel_outpoints.push(channel_outpoint);
}
}
rs.output_channel = if channel_outpoints.is_empty() {
log_warn!(
&self.logger,
"Expecting at least one channel from a receiving redshift node to close..."
);
None
} else {
Some(channel_outpoints)
};
}
}
rs.status = RedshiftStatus::Completed;
self.storage.persist_redshift(rs)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use crate::storage::MemoryStorage;
use std::str::FromStr;
use wasm_bindgen_test::{wasm_bindgen_test as test, wasm_bindgen_test_configure};
use crate::test_utils::*;
use super::*;
wasm_bindgen_test_configure!(run_in_browser);
fn dummy_redshift() -> Redshift {
let pubkey = PublicKey::from_str(
"02465ed5be53d04fde66c9418ff14a5f2267723810176c9212b722e542dc1afb1b",
)
.unwrap();
Redshift {
id: [0u8; 16],
input_utxo: Default::default(),
status: RedshiftStatus::ChannelOpening,
sending_node: pubkey,
recipient: RedshiftRecipient::OnChain(None),
receiving_node: None,
output_utxo: None,
introduction_channel: None,
output_channel: None,
introduction_node: pubkey,
amount_sats: 69_420,
sats_sent: 0,
change_amt: None,
fees_paid: 123,
}
}
#[test]
async fn test_redshift_persistence() {
let test_name = "test_create_signature";
log!("{}", test_name);
let storage = MemoryStorage::default();
let rs = dummy_redshift();
assert!(storage.get_redshifts().unwrap().is_empty());
storage.persist_redshift(rs.clone()).unwrap();
let read = storage.get_redshift(&rs.id).unwrap();
assert_eq!(read.unwrap(), rs);
let all = storage.get_redshifts().unwrap();
assert_eq!(all, vec![rs]);
}
}