package lnd
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"image/color"
"math/big"
prand "math/rand"
"net"
"path/filepath"
"regexp"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/go-errors/errors"
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/chanfitness"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/nat"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/localchans"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/sweep"
"github.com/lightningnetwork/lnd/ticker"
"github.com/lightningnetwork/lnd/tor"
"github.com/lightningnetwork/lnd/walletunlocker"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
)
const (
defaultMinPeers = 3
defaultStableConnDuration = 10 * time.Minute
numInstantInitReconnect = 10
maxInitReconnectDelay = 30
)
var (
ErrPeerNotConnected = errors.New("peer is not connected")
ErrServerNotActive = errors.New("server is still in the process of " +
"starting")
ErrServerShuttingDown = errors.New("server is shutting down")
validColorRegexp = regexp.MustCompile("^#[A-Fa-f0-9]{6}$")
)
type errPeerAlreadyConnected struct {
peer *peer
}
func (e *errPeerAlreadyConnected) Error() string {
return fmt.Sprintf("already connected to peer: %v", e.peer)
}
type server struct {
active int32 stopping int32
start sync.Once
stop sync.Once
identityPriv *btcec.PrivateKey
nodeSigner *netann.NodeSigner
chanStatusMgr *netann.ChanStatusManager
listenAddrs []net.Addr
torController *tor.Controller
natTraversal nat.Traversal
lastDetectedIP net.IP
mu sync.RWMutex
peersByPub map[string]*peer
inboundPeers map[string]*peer
outboundPeers map[string]*peer
peerConnectedListeners map[string][]chan<- lnpeer.Peer
peerDisconnectedListeners map[string][]chan<- struct{}
persistentPeers map[string]bool
persistentPeersBackoff map[string]time.Duration
persistentConnReqs map[string][]*connmgr.ConnReq
persistentRetryCancels map[string]chan struct{}
peerErrors map[string]*queue.CircularBuffer
ignorePeerTermination map[*peer]struct{}
scheduledPeerConnection map[string]func()
cc *chainControl
fundingMgr *fundingManager
chanDB *channeldb.DB
htlcSwitch *htlcswitch.Switch
invoices *invoices.InvoiceRegistry
channelNotifier *channelnotifier.ChannelNotifier
peerNotifier *peernotifier.PeerNotifier
htlcNotifier *htlcswitch.HtlcNotifier
witnessBeacon contractcourt.WitnessBeacon
breachArbiter *breachArbiter
missionControl *routing.MissionControl
chanRouter *routing.ChannelRouter
controlTower routing.ControlTower
authGossiper *discovery.AuthenticatedGossiper
localChanMgr *localchans.Manager
utxoNursery *utxoNursery
sweeper *sweep.UtxoSweeper
chainArb *contractcourt.ChainArbitrator
sphinx *hop.OnionProcessor
towerClient wtclient.Client
connMgr *connmgr.ConnManager
sigPool *lnwallet.SigPool
writePool *pool.Write
readPool *pool.Read
featureMgr *feature.Manager
currentNodeAnn *lnwire.NodeAnnouncement
chansToRestore walletunlocker.ChannelsToRecover
chanSubSwapper *chanbackup.SubSwapper
chanEventStore *chanfitness.ChannelEventStore
quit chan struct{}
wg sync.WaitGroup
}
func parseAddr(address string) (net.Addr, error) {
var (
host string
port int
)
h, p, err := net.SplitHostPort(address)
if err != nil {
host = address
port = defaultPeerPort
} else {
host = h
portNum, err := strconv.Atoi(p)
if err != nil {
return nil, err
}
port = portNum
}
if tor.IsOnionHost(host) {
return &tor.OnionAddr{OnionService: host, Port: port}, nil
}
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
return cfg.net.ResolveTCPAddr("tcp", hostPort)
}
func noiseDial(idPriv *btcec.PrivateKey) func(net.Addr) (net.Conn, error) {
return func(a net.Addr) (net.Conn, error) {
lnAddr := a.(*lnwire.NetAddress)
return brontide.Dial(idPriv, lnAddr, cfg.net.Dial)
}
}
func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
towerClientDB *wtdb.ClientDB, cc *chainControl,
privKey *btcec.PrivateKey,
chansToRestore walletunlocker.ChannelsToRecover,
chanPredicate chanacceptor.ChannelAcceptor,
torController *tor.Controller) (*server, error) {
var err error
listeners := make([]net.Listener, len(listenAddrs))
for i, listenAddr := range listenAddrs {
listeners[i], err = brontide.NewListener(
privKey, listenAddr.String(),
)
if err != nil {
return nil, err
}
}
globalFeatures := lnwire.NewRawFeatureVector()
if !cfg.ProtocolOptions.LegacyOnion() {
globalFeatures.Set(lnwire.TLVOnionPayloadOptional)
}
if !cfg.ProtocolOptions.NoStaticRemoteKey() {
globalFeatures.Set(lnwire.StaticRemoteKeyOptional)
}
if cfg.ProtocolOptions.AnchorCommitments() {
globalFeatures.Set(lnwire.AnchorsOptional)
}
var serializedPubKey [33]byte
copy(serializedPubKey[:], privKey.PubKey().SerializeCompressed())
graphDir := chanDB.Path()
sharedSecretPath := filepath.Join(graphDir, "sphinxreplay.db")
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog)
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
)
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
)
featureMgr, err := feature.NewManager(feature.Config{
NoTLVOnion: cfg.ProtocolOptions.LegacyOnion(),
NoStaticRemoteKey: cfg.ProtocolOptions.NoStaticRemoteKey(),
NoAnchors: !cfg.ProtocolOptions.AnchorCommitments(),
})
if err != nil {
return nil, err
}
registryConfig := invoices.RegistryConfig{
FinalCltvRejectDelta: defaultFinalCltvRejectDelta,
HtlcHoldDuration: invoices.DefaultHtlcHoldDuration,
Clock: clock.NewDefaultClock(),
AcceptKeySend: cfg.AcceptKeySend,
}
s := &server{
chanDB: chanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer),
writePool: writePool,
readPool: readPool,
chansToRestore: chansToRestore,
invoices: invoices.NewRegistry(
chanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
®istryConfig,
),
channelNotifier: channelnotifier.New(chanDB),
identityPriv: privKey,
nodeSigner: netann.NewNodeSigner(privKey),
listenAddrs: listenAddrs,
sphinx: hop.NewOnionProcessor(sphinxRouter),
torController: torController,
persistentPeers: make(map[string]bool),
persistentPeersBackoff: make(map[string]time.Duration),
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
persistentRetryCancels: make(map[string]chan struct{}),
peerErrors: make(map[string]*queue.CircularBuffer),
ignorePeerTermination: make(map[*peer]struct{}),
scheduledPeerConnection: make(map[string]func()),
peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),
featureMgr: featureMgr,
quit: make(chan struct{}),
}
s.witnessBeacon = &preimageBeacon{
wCache: chanDB.NewWitnessCache(),
subscribers: make(map[uint64]*preimageSubscriber),
}
_, currentHeight, err := s.cc.chainIO.GetBestBlock()
if err != nil {
return nil, err
}
s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
DB: chanDB,
LocalChannelClose: func(pubKey []byte,
request *htlcswitch.ChanClose) {
peer, err := s.FindPeerByPubStr(string(pubKey))
if err != nil {
srvrLog.Errorf("unable to close channel, peer"+
" with %v id can't be found: %v",
pubKey, err,
)
return
}
select {
case peer.localCloseChanReqs <- request:
srvrLog.Infof("Local close channel request "+
"delivered to peer: %x", pubKey[:])
case <-peer.quit:
srvrLog.Errorf("Unable to deliver local close "+
"channel request to peer %x, err: %v",
pubKey[:], err)
}
},
FwdingLog: chanDB.ForwardingLog(),
SwitchPackager: channeldb.NewSwitchPackager(),
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: s.fetchLastChanUpdate(),
Notifier: s.cc.chainNotifier,
HtlcNotifier: s.htlcNotifier,
FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval),
AllowCircularRoute: cfg.AllowCircularRoute,
RejectHTLC: cfg.RejectHTLC,
Clock: clock.NewDefaultClock(),
HTLCExpiry: htlcswitch.DefaultHTLCExpiry,
}, uint32(currentHeight))
if err != nil {
return nil, err
}
chanStatusMgrCfg := &netann.ChanStatusConfig{
ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
ChanEnableTimeout: cfg.ChanEnableTimeout,
ChanDisableTimeout: cfg.ChanDisableTimeout,
OurPubKey: privKey.PubKey(),
MessageSigner: s.nodeSigner,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate,
DB: chanDB,
Graph: chanDB.ChannelGraph(),
}
chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
if err != nil {
return nil, err
}
s.chanStatusMgr = chanStatusMgr
if cfg.NAT {
srvrLog.Info("Scanning local network for a UPnP enabled device")
discoveryTimeout := time.Duration(10 * time.Second)
ctx, cancel := context.WithTimeout(
context.Background(), discoveryTimeout,
)
defer cancel()
upnp, err := nat.DiscoverUPnP(ctx)
if err == nil {
s.natTraversal = upnp
} else {
srvrLog.Errorf("Unable to discover a UPnP enabled "+
"device on the local network: %v", err)
srvrLog.Info("Scanning local network for a NAT-PMP " +
"enabled device")
pmp, err := nat.DiscoverPMP(discoveryTimeout)
if err != nil {
err := fmt.Errorf("unable to discover a "+
"NAT-PMP enabled device on the local "+
"network: %v", err)
srvrLog.Error(err)
return nil, err
}
s.natTraversal = pmp
}
}
externalIPStrings := make([]string, len(cfg.ExternalIPs))
for idx, ip := range cfg.ExternalIPs {
externalIPStrings[idx] = ip.String()
}
if s.natTraversal != nil {
listenPorts := make([]uint16, 0, len(listenAddrs))
for _, listenAddr := range listenAddrs {
_, portStr, _ := net.SplitHostPort(listenAddr.String())
port, _ := strconv.Atoi(portStr)
listenPorts = append(listenPorts, uint16(port))
}
ips, err := s.configurePortForwarding(listenPorts...)
if err != nil {
srvrLog.Errorf("Unable to automatically set up port "+
"forwarding using %s: %v",
s.natTraversal.Name(), err)
} else {
srvrLog.Infof("Automatically set up port forwarding "+
"using %s to advertise external IP",
s.natTraversal.Name())
externalIPStrings = append(externalIPStrings, ips...)
}
}
externalIPs, err := lncfg.NormalizeAddresses(
externalIPStrings, strconv.Itoa(defaultPeerPort),
cfg.net.ResolveTCPAddr,
)
if err != nil {
return nil, err
}
selfAddrs := make([]net.Addr, 0, len(externalIPs))
selfAddrs = append(selfAddrs, externalIPs...)
chanGraph := chanDB.ChannelGraph()
color, err := parseHexColor(cfg.Color)
if err != nil {
srvrLog.Errorf("unable to parse color: %v\n", err)
return nil, err
}
alias := cfg.Alias
if alias == "" {
alias = hex.EncodeToString(serializedPubKey[:10])
}
nodeAlias, err := lnwire.NewNodeAlias(alias)
if err != nil {
return nil, err
}
selfNode := &channeldb.LightningNode{
HaveNodeAnnouncement: true,
LastUpdate: time.Now(),
Addresses: selfAddrs,
Alias: nodeAlias.String(),
Features: s.featureMgr.Get(feature.SetNodeAnn),
Color: color,
}
copy(selfNode.PubKeyBytes[:], privKey.PubKey().SerializeCompressed())
nodeAnn, err := selfNode.NodeAnnouncement(false)
if err != nil {
return nil, fmt.Errorf("unable to gen self node ann: %v", err)
}
authSig, err := netann.SignAnnouncement(
s.nodeSigner, s.identityPriv.PubKey(), nodeAnn,
)
if err != nil {
return nil, fmt.Errorf("unable to generate signature for "+
"self node announcement: %v", err)
}
selfNode.AuthSigBytes = authSig.Serialize()
nodeAnn.Signature, err = lnwire.NewSigFromRawSignature(
selfNode.AuthSigBytes,
)
if err != nil {
return nil, err
}
if err := chanGraph.SetSourceNode(selfNode); err != nil {
return nil, fmt.Errorf("can't set self node: %v", err)
}
s.currentNodeAnn = nodeAnn
sequencer, err := htlcswitch.NewPersistentSequencer(chanDB)
if err != nil {
return nil, err
}
queryBandwidth := func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
cid := lnwire.NewChanIDFromOutPoint(&edge.ChannelPoint)
link, err := s.htlcSwitch.GetLink(cid)
if err != nil {
return 0
}
if !link.EligibleToForward() {
return 0
}
return link.Bandwidth()
}
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
s.missionControl, err = routing.NewMissionControl(
chanDB,
&routing.MissionControlConfig{
AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
MaxMcHistory: routingConfig.MaxMcHistory,
AprioriWeight: routingConfig.AprioriWeight,
SelfNode: selfNode.PubKeyBytes,
MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
},
)
if err != nil {
return nil, fmt.Errorf("can't create mission control: %v", err)
}
srvrLog.Debugf("Instantiating payment session source with config: "+
"PaymentAttemptPenalty=%v, MinRouteProbability=%v",
int64(routingConfig.AttemptCost),
routingConfig.MinRouteProbability)
pathFindingConfig := routing.PathFindingConfig{
PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis(
routingConfig.AttemptCost,
),
MinProbability: routingConfig.MinRouteProbability,
}
paymentSessionSource := &routing.SessionSource{
Graph: chanGraph,
MissionControl: s.missionControl,
QueryBandwidth: queryBandwidth,
PathFindingConfig: pathFindingConfig,
}
paymentControl := channeldb.NewPaymentControl(chanDB)
s.controlTower = routing.NewControlTower(paymentControl)
s.chanRouter, err = routing.New(routing.Config{
Graph: chanGraph,
Chain: cc.chainIO,
ChainView: cc.chainView,
Payer: s.htlcSwitch,
Control: s.controlTower,
MissionControl: s.missionControl,
SessionSource: paymentSessionSource,
ChannelPruneExpiry: routing.DefaultChannelPruneExpiry,
GraphPruneInterval: time.Duration(time.Hour),
QueryBandwidth: queryBandwidth,
AssumeChannelValid: cfg.Routing.UseAssumeChannelValid(),
NextPaymentID: sequencer.NextID,
PathFindingConfig: pathFindingConfig,
Clock: clock.NewDefaultClock(),
})
if err != nil {
return nil, fmt.Errorf("can't create router: %v", err)
}
chanSeries := discovery.NewChanSeries(s.chanDB.ChannelGraph())
gossipMessageStore, err := discovery.NewMessageStore(s.chanDB)
if err != nil {
return nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB)
if err != nil {
return nil, err
}
s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
SelfNodeAnnouncement: func(refresh bool) (lnwire.NodeAnnouncement, error) {
return s.genNodeAnnouncement(refresh)
},
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitTicker: ticker.New(time.Minute * 30),
RebroadcastInterval: time.Hour * 24,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
NumActiveSyncers: cfg.NumGraphSyncPeers,
MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
},
s.identityPriv.PubKey(),
)
s.localChanMgr = &localchans.Manager{
ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
FetchChannel: s.chanDB.FetchChannel,
}
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
if err != nil {
srvrLog.Errorf("unable to create nursery store: %v", err)
return nil, err
}
srvrLog.Tracef("Sweeper batch window duration: %v",
sweep.DefaultBatchWindowDuration)
sweeperStore, err := sweep.NewSweeperStore(
chanDB, activeNetParams.GenesisHash,
)
if err != nil {
srvrLog.Errorf("unable to create sweeper store: %v", err)
return nil, err
}
s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
FeeEstimator: cc.feeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.wallet),
Signer: cc.wallet.Cfg.Signer,
Wallet: cc.wallet,
NewBatchTimer: func() <-chan time.Time {
return time.NewTimer(sweep.DefaultBatchWindowDuration).C
},
Notifier: cc.chainNotifier,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,
MaxSweepAttempts: sweep.DefaultMaxSweepAttempts,
NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc,
MaxFeeRate: sweep.DefaultMaxFeeRate,
FeeRateBucketSize: sweep.DefaultFeeRateBucketSize,
})
s.utxoNursery = newUtxoNursery(&NurseryConfig{
ChainIO: cc.chainIO,
ConfDepth: 1,
FetchClosedChannels: chanDB.FetchClosedChannels,
FetchClosedChannel: chanDB.FetchClosedChannel,
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
Store: utxnStore,
SweepInput: s.sweeper.SweepInput,
})
closeLink := func(chanPoint *wire.OutPoint,
closureType htlcswitch.ChannelCloseType) {
s.htlcSwitch.CloseLink(chanPoint, closureType, 0, nil)
}
contractBreaches := make(chan *ContractBreachEvent, 1)
s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
ChainHash: *activeNetParams.GenesisHash,
IncomingBroadcastDelta: DefaultIncomingBroadcastDelta,
OutgoingBroadcastDelta: DefaultOutgoingBroadcastDelta,
NewSweepAddr: newSweepPkScriptGen(cc.wallet),
PublishTx: cc.wallet.PublishTransaction,
DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
for _, msg := range msgs {
err := s.htlcSwitch.ProcessContractResolution(msg)
if err != nil {
return err
}
}
return nil
},
IncubateOutputs: func(chanPoint wire.OutPoint,
outHtlcRes *lnwallet.OutgoingHtlcResolution,
inHtlcRes *lnwallet.IncomingHtlcResolution,
broadcastHeight uint32) error {
var (
inRes []lnwallet.IncomingHtlcResolution
outRes []lnwallet.OutgoingHtlcResolution
)
if inHtlcRes != nil {
inRes = append(inRes, *inHtlcRes)
}
if outHtlcRes != nil {
outRes = append(outRes, *outHtlcRes)
}
return s.utxoNursery.IncubateOutputs(
chanPoint, outRes, inRes,
broadcastHeight,
)
},
PreimageDB: s.witnessBeacon,
Notifier: cc.chainNotifier,
Signer: cc.wallet.Cfg.Signer,
FeeEstimator: cc.feeEstimator,
ChainIO: cc.chainIO,
MarkLinkInactive: func(chanPoint wire.OutPoint) error {
chanID := lnwire.NewChanIDFromOutPoint(&chanPoint)
s.htlcSwitch.RemoveLink(chanID)
return nil
},
IsOurAddress: cc.wallet.IsOurAddress,
ContractBreach: func(chanPoint wire.OutPoint,
breachRet *lnwallet.BreachRetribution) error {
event := &ContractBreachEvent{
ChanPoint: chanPoint,
ProcessACK: make(chan error, 1),
BreachRetribution: breachRet,
}
select {
case contractBreaches <- event:
case <-s.quit:
return ErrServerShuttingDown
}
select {
case err := <-event.ProcessACK:
return err
case <-s.quit:
return ErrServerShuttingDown
}
},
DisableChannel: s.chanStatusMgr.RequestDisable,
Sweeper: s.sweeper,
Registry: s.invoices,
NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent,
OnionProcessor: s.sphinx,
PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC,
Clock: clock.NewDefaultClock(),
}, chanDB)
s.breachArbiter = newBreachArbiter(&BreachConfig{
CloseLink: closeLink,
DB: chanDB,
Estimator: s.cc.feeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.wallet),
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.wallet.Cfg.Signer,
Store: newRetributionStore(chanDB),
})
primaryChain := registeredChains.PrimaryChain()
chainCfg := cfg.Bitcoin
minRemoteDelay := minBtcRemoteDelay
maxRemoteDelay := maxBtcRemoteDelay
if primaryChain == litecoinChain {
chainCfg = cfg.Litecoin
minRemoteDelay = minLtcRemoteDelay
maxRemoteDelay = maxLtcRemoteDelay
}
var chanIDSeed [32]byte
if _, err := rand.Read(chanIDSeed[:]); err != nil {
return nil, err
}
s.fundingMgr, err = newFundingManager(fundingConfig{
IDKey: privKey.PubKey(),
Wallet: cc.wallet,
PublishTransaction: cc.wallet.PublishTransaction,
Notifier: cc.chainNotifier,
FeeEstimator: cc.feeEstimator,
SignMessage: func(pubKey *btcec.PublicKey,
msg []byte) (input.Signature, error) {
if pubKey.IsEqual(privKey.PubKey()) {
return s.nodeSigner.SignMessage(pubKey, msg)
}
return cc.msgSigner.SignMessage(pubKey, msg)
},
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return s.genNodeAnnouncement(true)
},
SendAnnouncement: func(msg lnwire.Message,
optionalFields ...discovery.OptionalMsgField) chan error {
return s.authGossiper.ProcessLocalAnnouncement(
msg, privKey.PubKey(), optionalFields...,
)
},
NotifyWhenOnline: s.NotifyWhenOnline,
TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (
*channeldb.OpenChannel, error) {
dbChannels, err := chanDB.FetchAllChannels()
if err != nil {
return nil, err
}
for _, channel := range dbChannels {
if chanID.IsChanPoint(&channel.FundingOutpoint) {
return channel, nil
}
}
return nil, fmt.Errorf("unable to find channel")
},
DefaultRoutingPolicy: cc.routingPolicy,
DefaultMinHtlcIn: cc.minHtlcIn,
NumRequiredConfs: func(chanAmt btcutil.Amount,
pushAmt lnwire.MilliSatoshi) uint16 {
defaultConf := uint16(chainCfg.DefaultNumChanConfs)
if defaultConf != 0 {
return defaultConf
}
minConf := uint64(3)
maxConf := uint64(6)
maxChannelSize := uint64(
lnwire.NewMSatFromSatoshis(MaxFundingAmount))
stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt
conf := maxConf * uint64(stake) / maxChannelSize
if conf < minConf {
conf = minConf
}
if conf > maxConf {
conf = maxConf
}
return uint16(conf)
},
RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
if defaultDelay > 0 {
return defaultDelay
}
delay := uint16(btcutil.Amount(maxRemoteDelay) *
chanAmt / MaxFundingAmount)
if delay < minRemoteDelay {
delay = minRemoteDelay
}
if delay > maxRemoteDelay {
delay = maxRemoteDelay
}
return delay
},
WatchNewChannel: func(channel *channeldb.OpenChannel,
peerKey *btcec.PublicKey) error {
s.mu.Lock()
pubStr := string(peerKey.SerializeCompressed())
if _, ok := s.persistentPeers[pubStr]; !ok {
s.persistentPeers[pubStr] = false
}
s.mu.Unlock()
return s.chainArb.WatchNewChannel(channel)
},
ReportShortChanID: func(chanPoint wire.OutPoint) error {
cid := lnwire.NewChanIDFromOutPoint(&chanPoint)
return s.htlcSwitch.UpdateShortChanID(cid)
},
RequiredRemoteChanReserve: func(chanAmt,
dustLimit btcutil.Amount) btcutil.Amount {
reserve := chanAmt / 100
if reserve < dustLimit {
reserve = dustLimit
}
return reserve
},
RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
},
RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
return uint16(input.MaxHTLCNumber / 2)
},
ZombieSweeperInterval: 1 * time.Minute,
ReservationTimeout: 10 * time.Minute,
MinChanSize: btcutil.Amount(cfg.MinChanSize),
MaxPendingChannels: cfg.MaxPendingChannels,
RejectPush: cfg.RejectPush,
NotifyOpenChannelEvent: s.channelNotifier.NotifyOpenChannelEvent,
OpenChannelPredicate: chanPredicate,
NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
})
if err != nil {
return nil, err
}
chanNotifier := &channelNotifier{
chanNotifier: s.channelNotifier,
addrs: s.chanDB,
}
backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
startingChans, err := chanbackup.FetchStaticChanBackups(s.chanDB)
if err != nil {
return nil, err
}
s.chanSubSwapper, err = chanbackup.NewSubSwapper(
startingChans, chanNotifier, s.cc.keyRing, backupFile,
)
if err != nil {
return nil, err
}
s.peerNotifier = peernotifier.New()
s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents,
SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents,
GetOpenChannels: s.chanDB.FetchAllOpenChannels,
})
if cfg.WtClient.Active {
policy := wtpolicy.DefaultPolicy()
if cfg.WtClient.SweepFeeRate != 0 {
sweepRateSatPerByte := chainfee.SatPerKVByte(
1000 * cfg.WtClient.SweepFeeRate,
)
policy.SweepFeeRate = sweepRateSatPerByte.FeePerKWeight()
}
if err := policy.Validate(); err != nil {
return nil, err
}
s.towerClient, err = wtclient.New(&wtclient.Config{
Signer: cc.wallet.Cfg.Signer,
NewAddress: newSweepPkScriptGen(cc.wallet),
SecretKeyRing: s.cc.keyRing,
Dial: cfg.net.Dial,
AuthDial: wtclient.AuthDial,
DB: towerClientDB,
Policy: policy,
ChainHash: *activeNetParams.GenesisHash,
MinBackoff: 10 * time.Second,
MaxBackoff: 5 * time.Minute,
ForceQuitDelay: wtclient.DefaultForceQuitDelay,
})
if err != nil {
return nil, err
}
}
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.InboundPeerConnected,
RetryDuration: time.Second * 5,
TargetOutbound: 100,
Dial: noiseDial(s.identityPriv),
OnConnection: s.OutboundPeerConnected,
})
if err != nil {
return nil, err
}
s.connMgr = cmgr
return s, nil
}
func (s *server) Started() bool {
return atomic.LoadInt32(&s.active) != 0
}
func (s *server) Start() error {
var startErr error
s.start.Do(func() {
if s.torController != nil {
if err := s.createNewHiddenService(); err != nil {
startErr = err
return
}
}
if s.natTraversal != nil {
s.wg.Add(1)
go s.watchExternalIP()
}
if err := s.sigPool.Start(); err != nil {
startErr = err
return
}
if err := s.writePool.Start(); err != nil {
startErr = err
return
}
if err := s.readPool.Start(); err != nil {
startErr = err
return
}
if err := s.cc.chainNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.channelNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.peerNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.htlcNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.sphinx.Start(); err != nil {
startErr = err
return
}
if s.towerClient != nil {
if err := s.towerClient.Start(); err != nil {
startErr = err
return
}
}
if err := s.htlcSwitch.Start(); err != nil {
startErr = err
return
}
if err := s.sweeper.Start(); err != nil {
startErr = err
return
}
if err := s.utxoNursery.Start(); err != nil {
startErr = err
return
}
if err := s.chainArb.Start(); err != nil {
startErr = err
return
}
if err := s.breachArbiter.Start(); err != nil {
startErr = err
return
}
if err := s.authGossiper.Start(); err != nil {
startErr = err
return
}
if err := s.chanRouter.Start(); err != nil {
startErr = err
return
}
if err := s.fundingMgr.Start(); err != nil {
startErr = err
return
}
if err := s.invoices.Start(); err != nil {
startErr = err
return
}
if err := s.chanStatusMgr.Start(); err != nil {
startErr = err
return
}
if err := s.chanEventStore.Start(); err != nil {
startErr = err
return
}
chanRestorer := &chanDBRestorer{
db: s.chanDB,
secretKeys: s.cc.keyRing,
chainArb: s.chainArb,
}
if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
err := chanbackup.UnpackAndRecoverSingles(
s.chansToRestore.PackedSingleChanBackups,
s.cc.keyRing, chanRestorer, s,
)
if err != nil {
startErr = fmt.Errorf("unable to unpack single "+
"backups: %v", err)
return
}
}
if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
err := chanbackup.UnpackAndRecoverMulti(
s.chansToRestore.PackedMultiChanBackup,
s.cc.keyRing, chanRestorer, s,
)
if err != nil {
startErr = fmt.Errorf("unable to unpack chan "+
"backup: %v", err)
return
}
}
if err := s.chanSubSwapper.Start(); err != nil {
startErr = err
return
}
s.connMgr.Start()
if err := s.chanDB.PruneLinkNodes(); err != nil {
startErr = err
return
}
if err := s.establishPersistentConnections(); err != nil {
startErr = err
return
}
if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) &&
!(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) {
bootstrappers, err := initNetworkBootstrappers(s)
if err != nil {
startErr = err
return
}
s.wg.Add(1)
go s.peerBootstrapper(defaultMinPeers, bootstrappers)
} else {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
atomic.StoreInt32(&s.active, 1)
})
return startErr
}
func (s *server) Stop() error {
s.stop.Do(func() {
atomic.StoreInt32(&s.stopping, 1)
close(s.quit)
s.chanStatusMgr.Stop()
s.cc.chainNotifier.Stop()
s.chanRouter.Stop()
s.htlcSwitch.Stop()
s.sphinx.Stop()
s.utxoNursery.Stop()
s.breachArbiter.Stop()
s.authGossiper.Stop()
s.chainArb.Stop()
s.sweeper.Stop()
s.channelNotifier.Stop()
s.peerNotifier.Stop()
s.htlcNotifier.Stop()
s.cc.wallet.Shutdown()
s.cc.chainView.Stop()
s.connMgr.Stop()
s.cc.feeEstimator.Stop()
s.invoices.Stop()
s.fundingMgr.Stop()
s.chanSubSwapper.Stop()
s.chanEventStore.Stop()
for _, peer := range s.Peers() {
s.DisconnectPeer(peer.addr.IdentityKey)
}
if s.towerClient != nil {
s.towerClient.Stop()
}
s.wg.Wait()
s.sigPool.Stop()
s.writePool.Stop()
s.readPool.Stop()
})
return nil
}
func (s *server) Stopped() bool {
return atomic.LoadInt32(&s.stopping) != 0
}
func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
ip, err := s.natTraversal.ExternalIP()
if err != nil {
return nil, err
}
s.lastDetectedIP = ip
externalIPs := make([]string, 0, len(ports))
for _, port := range ports {
if err := s.natTraversal.AddPortMapping(port); err != nil {
srvrLog.Debugf("Unable to forward port %d: %v", port, err)
continue
}
hostIP := fmt.Sprintf("%v:%d", ip, port)
externalIPs = append(externalIPs, hostIP)
}
return externalIPs, nil
}
func (s *server) removePortForwarding() {
forwardedPorts := s.natTraversal.ForwardedPorts()
for _, port := range forwardedPorts {
if err := s.natTraversal.DeletePortMapping(port); err != nil {
srvrLog.Errorf("Unable to remove forwarding rules for "+
"port %d: %v", port, err)
}
}
}
func (s *server) watchExternalIP() {
defer s.wg.Done()
defer s.removePortForwarding()
ipsSetByUser := make(map[string]struct{})
for _, ip := range cfg.ExternalIPs {
ipsSetByUser[ip.String()] = struct{}{}
}
forwardedPorts := s.natTraversal.ForwardedPorts()
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
out:
for {
select {
case <-ticker.C:
ip, err := s.natTraversal.ExternalIP()
if err != nil {
srvrLog.Debugf("Unable to retrieve the "+
"external IP address: %v", err)
continue
}
for _, port := range forwardedPorts {
err := s.natTraversal.AddPortMapping(port)
if err != nil {
srvrLog.Warnf("Unable to automatically "+
"re-create port forwarding using %s: %v",
s.natTraversal.Name(), err)
} else {
srvrLog.Debugf("Automatically re-created "+
"forwarding for port %d using %s to "+
"advertise external IP",
port, s.natTraversal.Name())
}
}
if ip.Equal(s.lastDetectedIP) {
continue
}
srvrLog.Infof("Detected new external IP address %s", ip)
var newAddrs []net.Addr
for _, port := range forwardedPorts {
hostIP := fmt.Sprintf("%v:%d", ip, port)
addr, err := net.ResolveTCPAddr("tcp", hostIP)
if err != nil {
srvrLog.Debugf("Unable to resolve "+
"host %v: %v", addr, err)
continue
}
newAddrs = append(newAddrs, addr)
}
if len(newAddrs) == 0 {
srvrLog.Debug("Skipping node announcement " +
"update due to not being able to " +
"resolve any new addresses")
continue
}
currentNodeAnn, err := s.genNodeAnnouncement(false)
if err != nil {
srvrLog.Debugf("Unable to retrieve current "+
"node announcement: %v", err)
continue
}
for _, addr := range currentNodeAnn.Addresses {
host, _, err := net.SplitHostPort(addr.String())
if err != nil {
srvrLog.Debugf("Unable to determine "+
"host from address %v: %v",
addr, err)
continue
}
_, setByUser := ipsSetByUser[addr.String()]
if setByUser || host != s.lastDetectedIP.String() {
newAddrs = append(newAddrs, addr)
}
}
newNodeAnn, err := s.genNodeAnnouncement(
true, netann.NodeAnnSetAddrs(newAddrs),
)
if err != nil {
srvrLog.Debugf("Unable to generate new node "+
"announcement: %v", err)
continue
}
err = s.BroadcastMessage(nil, &newNodeAnn)
if err != nil {
srvrLog.Debugf("Unable to broadcast new node "+
"announcement to peers: %v", err)
continue
}
s.lastDetectedIP = ip
case <-s.quit:
break out
}
}
}
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
srvrLog.Infof("Initializing peer network bootstrappers!")
var bootStrappers []discovery.NetworkPeerBootstrapper
chanGraph := autopilot.ChannelGraphFromDatabase(s.chanDB.ChannelGraph())
graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
if err != nil {
return nil, err
}
bootStrappers = append(bootStrappers, graphBootstrapper)
if !cfg.Bitcoin.SimNet || !cfg.Litecoin.SimNet {
dnsSeeds, ok := chainDNSSeeds[*activeNetParams.GenesisHash]
if ok {
srvrLog.Infof("Creating DNS peer bootstrapper with "+
"seeds: %v", dnsSeeds)
dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
dnsSeeds, cfg.net,
)
bootStrappers = append(bootStrappers, dnsBootStrapper)
}
}
return bootStrappers, nil
}
func (s *server) peerBootstrapper(numTargetPeers uint32,
bootstrappers []discovery.NetworkPeerBootstrapper) {
defer s.wg.Done()
ignore := make(map[autopilot.NodeID]struct{})
s.initialPeerBootstrap(ignore, numTargetPeers, bootstrappers)
backOff := time.Second * 15
sampleTicker := time.NewTicker(backOff)
defer sampleTicker.Stop()
var epochErrors uint32 var epochAttempts uint32
for {
select {
case <-sampleTicker.C:
s.mu.RLock()
numActivePeers := uint32(len(s.peersByPub))
s.mu.RUnlock()
if numActivePeers >= numTargetPeers {
continue
}
if epochAttempts > 0 &&
atomic.LoadUint32(&epochErrors) >= epochAttempts {
sampleTicker.Stop()
backOff *= 2
if backOff > bootstrapBackOffCeiling {
backOff = bootstrapBackOffCeiling
}
srvrLog.Debugf("Backing off peer bootstrapper to "+
"%v", backOff)
sampleTicker = time.NewTicker(backOff)
continue
}
atomic.StoreUint32(&epochErrors, 0)
epochAttempts = 0
numNeeded := numTargetPeers - numActivePeers
srvrLog.Debugf("Attempting to obtain %v more network "+
"peers", numNeeded)
s.mu.RLock()
ignoreList := make(map[autopilot.NodeID]struct{})
for _, peer := range s.peersByPub {
nID := autopilot.NewNodeID(peer.addr.IdentityKey)
ignoreList[nID] = struct{}{}
}
s.mu.RUnlock()
peerAddrs, err := discovery.MultiSourceBootstrap(
ignoreList, numNeeded*2, bootstrappers...,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve bootstrap "+
"peers: %v", err)
continue
}
for _, addr := range peerAddrs {
epochAttempts++
go func(a *lnwire.NetAddress) {
errChan := make(chan error, 1)
s.connectToPeer(a, errChan)
select {
case err := <-errChan:
if err == nil {
return
}
srvrLog.Errorf("Unable to "+
"connect to %v: %v",
a, err)
atomic.AddUint32(&epochErrors, 1)
case <-s.quit:
}
}(addr)
}
case <-s.quit:
return
}
}
}
const bootstrapBackOffCeiling = time.Minute * 5
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) {
var delaySignal <-chan time.Time
delayTime := time.Second * 2
backOffCeiling := bootstrapBackOffCeiling / 5
for attempts := 0; ; attempts++ {
if s.Stopped() {
return
}
s.mu.RLock()
numActivePeers := uint32(len(s.peersByPub))
s.mu.RUnlock()
if numActivePeers >= numTargetPeers {
return
}
if attempts > 0 {
srvrLog.Debugf("Waiting %v before trying to locate "+
"bootstrap peers (attempt #%v)", delayTime,
attempts)
delaySignal = time.After(delayTime)
select {
case <-delaySignal:
case <-s.quit:
return
}
delayTime *= 2
if delayTime > backOffCeiling {
delayTime = backOffCeiling
}
}
peersNeeded := numTargetPeers - numActivePeers
bootstrapAddrs, err := discovery.MultiSourceBootstrap(
ignore, peersNeeded, bootstrappers...,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
"peers: %v", err)
continue
}
var wg sync.WaitGroup
for _, bootstrapAddr := range bootstrapAddrs {
wg.Add(1)
go func(addr *lnwire.NetAddress) {
defer wg.Done()
errChan := make(chan error, 1)
go s.connectToPeer(addr, errChan)
select {
case err := <-errChan:
if err == nil {
return
}
srvrLog.Errorf("Unable to connect to "+
"%v: %v", addr, err)
case <-time.After(3 * time.Second):
srvrLog.Tracef("Skipping peer %v due "+
"to not establishing a "+
"connection within 3 seconds",
addr)
case <-s.quit:
}
}(bootstrapAddr)
}
wg.Wait()
}
}
func (s *server) createNewHiddenService() error {
listenPorts := make([]int, 0, len(s.listenAddrs))
for _, listenAddr := range s.listenAddrs {
port := listenAddr.(*net.TCPAddr).Port
listenPorts = append(listenPorts, port)
}
onionCfg := tor.AddOnionConfig{
VirtualPort: defaultPeerPort,
TargetPorts: listenPorts,
Store: tor.NewOnionFile(cfg.Tor.PrivateKeyPath, 0600),
}
switch {
case cfg.Tor.V2:
onionCfg.Type = tor.V2
case cfg.Tor.V3:
onionCfg.Type = tor.V3
}
addr, err := s.torController.AddOnion(onionCfg)
if err != nil {
return err
}
newNodeAnn, err := s.genNodeAnnouncement(
true, func(currentAnn *lnwire.NodeAnnouncement) {
currentAnn.Addresses = append(currentAnn.Addresses, addr)
},
)
if err != nil {
return fmt.Errorf("unable to generate new node "+
"announcement: %v", err)
}
selfNode := &channeldb.LightningNode{
HaveNodeAnnouncement: true,
LastUpdate: time.Unix(int64(newNodeAnn.Timestamp), 0),
Addresses: newNodeAnn.Addresses,
Alias: newNodeAnn.Alias.String(),
Features: lnwire.NewFeatureVector(
newNodeAnn.Features, lnwire.Features,
),
Color: newNodeAnn.RGBColor,
AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
}
copy(selfNode.PubKeyBytes[:], s.identityPriv.PubKey().SerializeCompressed())
if err := s.chanDB.ChannelGraph().SetSourceNode(selfNode); err != nil {
return fmt.Errorf("can't set self node: %v", err)
}
return nil
}
func (s *server) genNodeAnnouncement(refresh bool,
modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error) {
s.mu.Lock()
defer s.mu.Unlock()
if !refresh {
return *s.currentNodeAnn, nil
}
modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
err := netann.SignNodeAnnouncement(
s.nodeSigner, s.identityPriv.PubKey(), s.currentNodeAnn,
modifiers...,
)
if err != nil {
return lnwire.NodeAnnouncement{}, err
}
return *s.currentNodeAnn, nil
}
type nodeAddresses struct {
pubKey *btcec.PublicKey
addresses []net.Addr
}
func (s *server) establishPersistentConnections() error {
nodeAddrsMap := map[string]*nodeAddresses{}
linkNodes, err := s.chanDB.FetchAllLinkNodes()
if err != nil && err != channeldb.ErrLinkNodesNotFound {
return err
}
for _, node := range linkNodes {
pubStr := string(node.IdentityPub.SerializeCompressed())
nodeAddrs := &nodeAddresses{
pubKey: node.IdentityPub,
addresses: node.Addresses,
}
nodeAddrsMap[pubStr] = nodeAddrs
}
chanGraph := s.chanDB.ChannelGraph()
sourceNode, err := chanGraph.SourceNode()
if err != nil {
return err
}
selfPub := s.identityPriv.PubKey().SerializeCompressed()
err = sourceNode.ForEachChannel(nil, func(
tx kvdb.ReadTx,
chanInfo *channeldb.ChannelEdgeInfo,
policy, _ *channeldb.ChannelEdgePolicy) error {
if policy == nil {
srvrLog.Warnf("No channel policy found for "+
"ChannelPoint(%v): ", chanInfo.ChannelPoint)
}
channelPeer, err := chanInfo.FetchOtherNode(tx, selfPub)
if err != nil {
return fmt.Errorf("unable to fetch channel peer for "+
"ChannelPoint(%v): %v", chanInfo.ChannelPoint,
err)
}
pubStr := string(channelPeer.PubKeyBytes[:])
addrSet := make(map[string]net.Addr)
for _, addr := range channelPeer.Addresses {
switch addr.(type) {
case *net.TCPAddr:
addrSet[addr.String()] = addr
case *tor.OnionAddr:
if cfg.Tor.Active {
addrSet[addr.String()] = addr
}
}
}
linkNodeAddrs, ok := nodeAddrsMap[pubStr]
if ok {
for _, lnAddress := range linkNodeAddrs.addresses {
switch lnAddress.(type) {
case *net.TCPAddr:
addrSet[lnAddress.String()] = lnAddress
case *tor.OnionAddr:
if cfg.Tor.Active {
addrSet[lnAddress.String()] = lnAddress
}
}
}
}
var addrs []net.Addr
for _, addr := range addrSet {
addrs = append(addrs, addr)
}
n := &nodeAddresses{
addresses: addrs,
}
n.pubKey, err = channelPeer.PubKey()
if err != nil {
return err
}
nodeAddrsMap[pubStr] = n
return nil
})
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
var numOutboundConns int
for pubStr, nodeAddr := range nodeAddrsMap {
s.persistentPeers[pubStr] = false
if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
s.persistentPeersBackoff[pubStr] = cfg.MinBackoff
}
for _, address := range nodeAddr.addresses {
lnAddr := &lnwire.NetAddress{
IdentityKey: nodeAddr.pubKey,
Address: address,
}
srvrLog.Debugf("Attempting persistent connection to "+
"channel peer %v", lnAddr)
connReq := &connmgr.ConnReq{
Addr: lnAddr,
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq)
if numOutboundConns < numInstantInitReconnect ||
!cfg.StaggerInitialReconnect {
go s.connMgr.Connect(connReq)
} else {
go s.delayInitialReconnect(connReq)
}
}
numOutboundConns++
}
return nil
}
func (s *server) delayInitialReconnect(connReq *connmgr.ConnReq) {
delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
select {
case <-time.After(delay):
s.connMgr.Connect(connReq)
case <-s.quit:
}
}
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
pubKeyStr := string(compressedPubKey[:])
s.mu.Lock()
if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
delete(s.persistentPeers, pubKeyStr)
delete(s.persistentPeersBackoff, pubKeyStr)
s.cancelConnReqs(pubKeyStr, nil)
s.mu.Unlock()
srvrLog.Infof("Pruned peer %x from persistent connections, "+
"peer has no open channels", compressedPubKey)
return
}
s.mu.Unlock()
}
func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
msgs ...lnwire.Message) error {
srvrLog.Debugf("Broadcasting %v messages", len(msgs))
s.mu.RLock()
peers := make([]*peer, 0, len(s.peersByPub))
for _, sPeer := range s.peersByPub {
if skips != nil {
if _, ok := skips[sPeer.pubKeyBytes]; ok {
srvrLog.Tracef("Skipping %x in broadcast",
sPeer.pubKeyBytes[:])
continue
}
}
peers = append(peers, sPeer)
}
s.mu.RUnlock()
var wg sync.WaitGroup
for _, sPeer := range peers {
wg.Add(1)
s.wg.Add(1)
go func(p lnpeer.Peer) {
defer s.wg.Done()
defer wg.Done()
p.SendMessageLazy(false, msgs...)
}(sPeer)
}
wg.Wait()
return nil
}
func (s *server) NotifyWhenOnline(peerKey [33]byte,
peerChan chan<- lnpeer.Peer) {
s.mu.Lock()
defer s.mu.Unlock()
pubStr := string(peerKey[:])
peer, ok := s.peersByPub[pubStr]
if ok {
srvrLog.Debugf("Notifying that peer %x is online", peerKey)
select {
case peerChan <- peer:
case <-s.quit:
}
return
}
s.peerConnectedListeners[pubStr] = append(
s.peerConnectedListeners[pubStr], peerChan,
)
}
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
c := make(chan struct{})
peerPubKeyStr := string(peerPubKey[:])
if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
close(c)
return c
}
s.peerDisconnectedListeners[peerPubKeyStr] = append(
s.peerDisconnectedListeners[peerPubKeyStr], c,
)
return c
}
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) {
s.mu.RLock()
defer s.mu.RUnlock()
pubStr := string(peerKey.SerializeCompressed())
return s.findPeerByPubStr(pubStr)
}
func (s *server) FindPeerByPubStr(pubStr string) (*peer, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.findPeerByPubStr(pubStr)
}
func (s *server) findPeerByPubStr(pubStr string) (*peer, error) {
peer, ok := s.peersByPub[pubStr]
if !ok {
return nil, ErrPeerNotConnected
}
return peer, nil
}
func (s *server) nextPeerBackoff(pubStr string,
startTime time.Time) time.Duration {
backoff, ok := s.persistentPeersBackoff[pubStr]
if !ok {
return cfg.MinBackoff
}
if startTime.IsZero() {
return computeNextBackoff(backoff)
}
connDuration := time.Since(startTime)
if connDuration < defaultStableConnDuration {
return computeNextBackoff(backoff)
}
relaxedBackoff := computeNextBackoff(backoff) - connDuration
if relaxedBackoff > cfg.MinBackoff {
return relaxedBackoff
}
return cfg.MinBackoff
}
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
localPubBytes := local.SerializeCompressed()
remotePubPbytes := remote.SerializeCompressed()
return bytes.Compare(localPubBytes, remotePubPbytes) > 0
}
func (s *server) InboundPeerConnected(conn net.Conn) {
if s.Stopped() {
return
}
nodePub := conn.(*brontide.Conn).RemotePub()
pubStr := string(nodePub.SerializeCompressed())
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have outbound connection for %x, "+
"ignoring inbound connection",
nodePub.SerializeCompressed())
conn.Close()
return
}
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
srvrLog.Debugf("Ignoring connection, peer already scheduled")
conn.Close()
return
}
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
connectedPeer, err := s.findPeerByPubStr(pubStr)
switch err {
case ErrPeerNotConnected:
s.cancelConnReqs(pubStr, nil)
s.peerConnected(conn, nil, true)
case nil:
localPub := s.identityPriv.PubKey()
if !connectedPeer.inbound &&
!shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Received inbound connection from "+
"peer %v, but already have outbound "+
"connection, dropping conn", connectedPeer)
conn.Close()
return
}
srvrLog.Debugf("Disconnecting stale connection to %v",
connectedPeer)
s.cancelConnReqs(pubStr, nil)
s.removePeer(connectedPeer)
s.ignorePeerTermination[connectedPeer] = struct{}{}
s.scheduledPeerConnection[pubStr] = func() {
s.peerConnected(conn, nil, true)
}
}
}
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
if s.Stopped() {
return
}
nodePub := conn.(*brontide.Conn).RemotePub()
pubStr := string(nodePub.SerializeCompressed())
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have inbound connection for %x, "+
"ignoring outbound connection",
nodePub.SerializeCompressed())
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
conn.Close()
return
}
if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
srvrLog.Debugf("Ignoring canceled outbound connection")
s.connMgr.Remove(connReq.ID())
conn.Close()
return
}
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
srvrLog.Debugf("Ignoring connection, peer already scheduled")
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
conn.Close()
return
}
srvrLog.Infof("Established connection to: %x@%v", pubStr,
conn.RemoteAddr())
if connReq != nil {
ignore := connReq.ID()
s.cancelConnReqs(pubStr, &ignore)
} else {
s.cancelConnReqs(pubStr, nil)
}
connectedPeer, err := s.findPeerByPubStr(pubStr)
switch err {
case ErrPeerNotConnected:
s.peerConnected(conn, connReq, false)
case nil:
localPub := s.identityPriv.PubKey()
if connectedPeer.inbound &&
shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Established outbound connection to "+
"peer %v, but already have inbound "+
"connection, dropping conn", connectedPeer)
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
conn.Close()
return
}
srvrLog.Debugf("Disconnecting stale connection to %v",
connectedPeer)
s.removePeer(connectedPeer)
s.ignorePeerTermination[connectedPeer] = struct{}{}
s.scheduledPeerConnection[pubStr] = func() {
s.peerConnected(conn, connReq, false)
}
}
}
const UnassignedConnID uint64 = 0
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
close(cancelChan)
delete(s.persistentRetryCancels, pubStr)
}
connReqs, ok := s.persistentConnReqs[pubStr]
if !ok {
return
}
for _, connReq := range connReqs {
connID := connReq.ID()
if connID == UnassignedConnID {
continue
}
if skip != nil && connID == *skip {
continue
}
s.connMgr.Remove(connID)
}
delete(s.persistentConnReqs, pubStr)
}
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
inbound bool) {
brontideConn := conn.(*brontide.Conn)
addr := conn.RemoteAddr()
pubKey := brontideConn.RemotePub()
srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
pubKey.SerializeCompressed(), addr, inbound)
peerAddr := &lnwire.NetAddress{
IdentityKey: pubKey,
Address: addr,
ChainNet: activeNetParams.Net,
}
initFeatures := s.featureMgr.Get(feature.SetInit)
legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
errBuffer, ok := s.peerErrors[pkStr]
if !ok {
var err error
errBuffer, err = queue.NewCircularBuffer(errorBufferSize)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
}
}
p, err := newPeer(
conn, connReq, s, peerAddr, inbound, initFeatures,
legacyFeatures, cfg.ChanEnableTimeout,
defaultOutgoingCltvRejectDelta, errBuffer,
)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
}
s.addPeer(p)
delete(s.peerErrors, pkStr)
s.wg.Add(1)
go s.peerInitializer(p)
}
func (s *server) addPeer(p *peer) {
if p == nil {
return
}
if s.Stopped() {
p.Disconnect(ErrServerShuttingDown)
return
}
pubSer := p.addr.IdentityKey.SerializeCompressed()
pubStr := string(pubSer)
s.peersByPub[pubStr] = p
if p.inbound {
s.inboundPeers[pubStr] = p
} else {
s.outboundPeers[pubStr] = p
}
var pubKey [33]byte
copy(pubKey[:], pubSer)
s.peerNotifier.NotifyPeerOnline(pubKey)
}
func (s *server) peerInitializer(p *peer) {
defer s.wg.Done()
if s.Stopped() {
return
}
ready := make(chan struct{})
s.wg.Add(1)
go s.peerTerminationWatcher(p, ready)
if err := p.Start(); err != nil {
p.Disconnect(fmt.Errorf("unable to start peer: %v", err))
return
}
close(ready)
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
s.mu.Lock()
defer s.mu.Unlock()
srvrLog.Debugf("Notifying that peer %v is online", p)
for _, peerChan := range s.peerConnectedListeners[pubStr] {
select {
case peerChan <- p:
case <-s.quit:
return
}
}
delete(s.peerConnectedListeners, pubStr)
}
func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
defer s.wg.Done()
p.WaitForDisconnect(ready)
srvrLog.Debugf("Peer %v has been disconnected", p)
if s.Stopped() {
return
}
s.fundingMgr.CancelPeerReservations(p.PubKey())
pubKey := p.addr.IdentityKey
s.authGossiper.PruneSyncState(p.PubKey())
links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes)
if err != nil && err != htlcswitch.ErrNoLinksFound {
srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
}
for _, link := range links {
p.server.htlcSwitch.RemoveLink(link.ChanID())
}
s.mu.Lock()
defer s.mu.Unlock()
srvrLog.Debugf("Notifying that peer %v is offline", p)
pubStr := string(pubKey.SerializeCompressed())
for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
close(offlineChan)
}
delete(s.peerDisconnectedListeners, pubStr)
if _, ok := s.ignorePeerTermination[p]; ok {
delete(s.ignorePeerTermination, p)
pubKey := p.PubKey()
pubStr := string(pubKey[:])
connCallback, ok := s.scheduledPeerConnection[pubStr]
if ok {
delete(s.scheduledPeerConnection, pubStr)
connCallback()
}
return
}
s.removePeer(p)
_, ok := s.persistentPeers[pubStr]
if ok {
if _, ok := s.persistentConnReqs[pubStr]; ok {
return
}
if p.inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey)
switch {
case err == nil:
p.addr.Address = advertisedAddr
case err == errNoAdvertisedAddr:
if s.torController == nil {
break
}
srvrLog.Debugf("Ignoring reconnection attempt "+
"to inbound peer %v without "+
"advertised address", p)
return
default:
srvrLog.Errorf("Unable to retrieve advertised "+
"address for node %x: %v", p.PubKey(),
err)
}
}
connReq := &connmgr.ConnReq{
Addr: p.addr,
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq)
backoff := s.nextPeerBackoff(pubStr, p.StartTime())
s.persistentPeersBackoff[pubStr] = backoff
cancelChan, ok := s.persistentRetryCancels[pubStr]
if !ok {
cancelChan = make(chan struct{})
s.persistentRetryCancels[pubStr] = cancelChan
}
go func() {
srvrLog.Debugf("Scheduling connection re-establishment to "+
"persistent peer %v in %s", p, backoff)
select {
case <-time.After(backoff):
case <-cancelChan:
return
case <-s.quit:
return
}
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
s.connMgr.Connect(connReq)
}()
}
}
func (s *server) removePeer(p *peer) {
if p == nil {
return
}
srvrLog.Debugf("removing peer %v", p)
p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
if p.connReq != nil {
s.connMgr.Remove(p.connReq.ID())
}
if s.Stopped() {
return
}
pubSer := p.addr.IdentityKey.SerializeCompressed()
pubStr := string(pubSer)
delete(s.peersByPub, pubStr)
if p.inbound {
delete(s.inboundPeers, pubStr)
} else {
delete(s.outboundPeers, pubStr)
}
if p.errorBuffer.Total() > 0 {
s.peerErrors[pubStr] = p.errorBuffer
}
var pubKey [33]byte
copy(pubKey[:], pubSer)
s.peerNotifier.NotifyPeerOffline(pubKey)
}
type openChanReq struct {
targetPubkey *btcec.PublicKey
chainHash chainhash.Hash
subtractFees bool
localFundingAmt btcutil.Amount
pushAmt lnwire.MilliSatoshi
fundingFeePerKw chainfee.SatPerKWeight
private bool
minHtlcIn lnwire.MilliSatoshi
remoteCsvDelay uint16
minConfs int32
shutdownScript lnwire.DeliveryAddress
chanFunder chanfunding.Assembler
pendingChanID [32]byte
updates chan *lnrpc.OpenStatusUpdate
err chan error
}
func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
targetPub := string(addr.IdentityKey.SerializeCompressed())
s.mu.Lock()
peer, err := s.findPeerByPubStr(targetPub)
if err == nil {
s.mu.Unlock()
return &errPeerAlreadyConnected{peer: peer}
}
if reqs, ok := s.persistentConnReqs[targetPub]; ok {
srvrLog.Warnf("Already have %d persistent connection "+
"requests for %x@%v, connecting anyway.", len(reqs),
targetPub, addr)
}
srvrLog.Debugf("Connecting to %x@%v", targetPub, addr)
if perm {
connReq := &connmgr.ConnReq{
Addr: addr,
Permanent: true,
}
s.persistentPeers[targetPub] = true
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = cfg.MinBackoff
}
s.persistentConnReqs[targetPub] = append(
s.persistentConnReqs[targetPub], connReq,
)
s.mu.Unlock()
go s.connMgr.Connect(connReq)
return nil
}
s.mu.Unlock()
errChan := make(chan error, 1)
s.connectToPeer(addr, errChan)
select {
case err := <-errChan:
return err
case <-s.quit:
return ErrServerShuttingDown
}
}
func (s *server) connectToPeer(addr *lnwire.NetAddress, errChan chan<- error) {
conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial)
if err != nil {
srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
select {
case errChan <- err:
case <-s.quit:
}
return
}
close(errChan)
s.OutboundPeerConnected(nil, conn)
}
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
pubBytes := pubKey.SerializeCompressed()
pubStr := string(pubBytes)
s.mu.Lock()
defer s.mu.Unlock()
peer, err := s.findPeerByPubStr(pubStr)
if err == ErrPeerNotConnected {
return fmt.Errorf("peer %x is not connected", pubBytes)
}
srvrLog.Infof("Disconnecting from %v", peer)
s.cancelConnReqs(pubStr, nil)
delete(s.persistentPeers, pubStr)
delete(s.persistentPeersBackoff, pubStr)
s.removePeer(peer)
s.ignorePeerTermination[peer] = struct{}{}
return nil
}
func (s *server) OpenChannel(
req *openChanReq) (chan *lnrpc.OpenStatusUpdate, chan error) {
req.updates = make(chan *lnrpc.OpenStatusUpdate, 2)
req.err = make(chan error, 1)
pubKeyBytes := req.targetPubkey.SerializeCompressed()
s.mu.RLock()
peer, ok := s.peersByPub[string(pubKeyBytes)]
if !ok {
s.mu.RUnlock()
req.err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
return req.updates, req.err
}
s.mu.RUnlock()
select {
case <-peer.activeSignal:
case <-peer.quit:
req.err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
return req.updates, req.err
case <-s.quit:
req.err <- ErrServerShuttingDown
return req.updates, req.err
}
if req.fundingFeePerKw == 0 {
estimator := s.cc.feeEstimator
feeRate, err := estimator.EstimateFeePerKW(6)
if err != nil {
req.err <- err
return req.updates, req.err
}
req.fundingFeePerKw = feeRate
}
go s.fundingMgr.initFundingWorkflow(peer, req)
return req.updates, req.err
}
func (s *server) Peers() []*peer {
s.mu.RLock()
defer s.mu.RUnlock()
peers := make([]*peer, 0, len(s.peersByPub))
for _, peer := range s.peersByPub {
peers = append(peers, peer)
}
return peers
}
func parseHexColor(colorStr string) (color.RGBA, error) {
if !validColorRegexp.MatchString(colorStr) {
return color.RGBA{}, errors.New("Color must be specified " +
"using a hexadecimal value in the form #RRGGBB")
}
colorBytes, err := hex.DecodeString(colorStr[1:])
if err != nil {
return color.RGBA{}, err
}
return color.RGBA{R: colorBytes[0], G: colorBytes[1], B: colorBytes[2]}, nil
}
func computeNextBackoff(currBackoff time.Duration) time.Duration {
nextBackoff := 2 * currBackoff
if nextBackoff > cfg.MaxBackoff {
nextBackoff = cfg.MaxBackoff
}
margin := nextBackoff / 10
var wiggle big.Int
wiggle.SetUint64(uint64(margin))
if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
return nextBackoff
}
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
}
var errNoAdvertisedAddr = errors.New("no advertised address found")
func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error) {
vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
if err != nil {
return nil, err
}
node, err := s.chanDB.ChannelGraph().FetchLightningNode(nil, vertex)
if err != nil {
return nil, err
}
if len(node.Addresses) == 0 {
return nil, errNoAdvertisedAddr
}
return node.Addresses[0], nil
}
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
*lnwire.ChannelUpdate, error) {
ourPubKey := s.identityPriv.PubKey().SerializeCompressed()
return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
info, edge1, edge2, err := s.chanRouter.GetChannelByID(cid)
if err != nil {
return nil, err
}
return netann.ExtractChannelUpdate(
ourPubKey[:], info, edge1, edge2,
)
}
}
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
pubKey := s.identityPriv.PubKey()
errChan := s.authGossiper.ProcessLocalAnnouncement(update, pubKey)
select {
case err := <-errChan:
return err
case <-s.quit:
return ErrServerShuttingDown
}
}
func newSweepPkScriptGen(
wallet lnwallet.WalletController) func() ([]byte, error) {
return func() ([]byte, error) {
sweepAddr, err := wallet.NewAddress(lnwallet.WitnessPubKey, false)
if err != nil {
return nil, err
}
return txscript.PayToAddrScript(sweepAddr)
}
}