package lnd
import (
"bytes"
"container/list"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/ticker"
)
const (
pingInterval = 1 * time.Minute
idleTimeout = 5 * time.Minute
writeMessageTimeout = 5 * time.Second
readMessageTimeout = 5 * time.Second
handshakeTimeout = 15 * time.Second
outgoingQueueLen = 50
errorBufferSize = 10
)
type outgoingMsg struct {
priority bool
msg lnwire.Message
errChan chan error }
type newChannelMsg struct {
channel *channeldb.OpenChannel
err chan error
}
type closeMsg struct {
cid lnwire.ChannelID
msg lnwire.Message
}
type pendingUpdate struct {
Txid []byte
OutputIndex uint32
}
type channelCloseUpdate struct {
ClosingTxid []byte
Success bool
}
type timestampedError struct {
error error
timestamp time.Time
}
type peer struct {
started int32
disconnect int32
bytesReceived uint64
bytesSent uint64
pingTime int64
pingLastSend int64
connReq *connmgr.ConnReq
conn net.Conn
addr *lnwire.NetAddress
pubKeyBytes [33]byte
activeSignal chan struct{}
startTime time.Time
inbound bool
sendQueue chan outgoingMsg
outgoingQueue chan outgoingMsg
activeChanMtx sync.RWMutex
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
addedChannels map[lnwire.ChannelID]struct{}
newChannels chan *newChannelMsg
activeMsgStreams map[lnwire.ChannelID]*msgStream
activeChanCloses map[lnwire.ChannelID]*channelCloser
localCloseChanReqs chan *htlcswitch.ChanClose
linkFailures chan linkFailureReport
chanCloseMsgs chan *closeMsg
chanActiveTimeout time.Duration
server *server
features *lnwire.FeatureVector
legacyFeatures *lnwire.FeatureVector
outgoingCltvRejectDelta uint32
remoteFeatures *lnwire.FeatureVector
resentChanSyncMsg map[lnwire.ChannelID]struct{}
errorBuffer *queue.CircularBuffer
writePool *pool.Write
readPool *pool.Read
queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
}
var _ lnpeer.Peer = (*peer)(nil)
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
addr *lnwire.NetAddress, inbound bool,
features, legacyFeatures *lnwire.FeatureVector,
chanActiveTimeout time.Duration,
outgoingCltvRejectDelta uint32,
errBuffer *queue.CircularBuffer) (
*peer, error) {
nodePub := addr.IdentityKey
p := &peer{
conn: conn,
addr: addr,
activeSignal: make(chan struct{}),
inbound: inbound,
connReq: connReq,
server: server,
features: features,
legacyFeatures: legacyFeatures,
outgoingCltvRejectDelta: outgoingCltvRejectDelta,
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: make(map[lnwire.ChannelID]struct{}),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*channelCloser),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg),
resentChanSyncMsg: make(map[lnwire.ChannelID]struct{}),
chanActiveTimeout: chanActiveTimeout,
errorBuffer: errBuffer,
writePool: server.writePool,
readPool: server.readPool,
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
copy(p.pubKeyBytes[:], nodePub.SerializeCompressed())
return p, nil
}
func (p *peer) Start() error {
if atomic.AddInt32(&p.started, 1) != 1 {
return nil
}
peerLog.Tracef("Peer %v starting", p)
if err := p.sendInitMsg(); err != nil {
return fmt.Errorf("unable to send init msg: %v", err)
}
readErr := make(chan error, 1)
msgChan := make(chan lnwire.Message, 1)
p.wg.Add(1)
go func() {
defer p.wg.Done()
msg, err := p.readNextMessage()
if err != nil {
readErr <- err
msgChan <- nil
return
}
readErr <- nil
msgChan <- msg
}()
select {
case <-time.After(handshakeTimeout):
return fmt.Errorf("peer did not complete handshake within %v",
handshakeTimeout)
case err := <-readErr:
if err != nil {
return fmt.Errorf("unable to read init msg: %v", err)
}
}
msg := <-msgChan
if msg, ok := msg.(*lnwire.Init); ok {
if err := p.handleInitMsg(msg); err != nil {
p.storeError(err)
return err
}
} else {
return errors.New("very first message between nodes " +
"must be init message")
}
activeChans, err := p.server.chanDB.FetchOpenChannels(p.addr.IdentityKey)
if err != nil {
peerLog.Errorf("unable to fetch active chans "+
"for peer %v: %v", p, err)
return err
}
if len(activeChans) == 0 {
p.server.prunePersistentPeerConnection(p.pubKeyBytes)
}
peerLog.Debugf("Loaded %v active channels from database with "+
"NodeKey(%x)", len(activeChans), p.PubKey())
msgs, err := p.loadActiveChannels(activeChans)
if err != nil {
return fmt.Errorf("unable to load channels: %v", err)
}
p.startTime = time.Now()
p.wg.Add(5)
go p.queueHandler()
go p.writeHandler()
go p.readHandler()
go p.channelManager()
go p.pingHandler()
close(p.activeSignal)
if len(msgs) > 0 {
peerLog.Infof("Sending %d channel sync messages to peer after "+
"loading active channels", len(msgs))
if err := p.SendMessage(true, msgs...); err != nil {
peerLog.Warnf("Failed sending channel sync "+
"messages to peer %v: %v", p, err)
}
}
p.maybeSendNodeAnn(activeChans)
return nil
}
func (p *peer) initGossipSync() {
switch {
case p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional):
srvrLog.Infof("Negotiated chan series queries with %x",
p.pubKeyBytes[:])
p.server.authGossiper.InitSyncState(p)
}
}
func (p *peer) QuitSignal() <-chan struct{} {
return p.quit
}
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
[]lnwire.Message, error) {
var msgs []lnwire.Message
for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, dbChan, p.server.sigPool,
)
if err != nil {
return nil, err
}
chanPoint := &dbChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
peerLog.Infof("NodeKey(%x) loading ChannelPoint(%v)",
p.PubKey(), chanPoint)
switch {
case !dbChan.HasChanStatus(channeldb.ChanStatusDefault) &&
!dbChan.HasChanStatus(channeldb.ChanStatusRestored):
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
"start.", chanPoint, dbChan.ChanStatus())
chanSync, err := dbChan.ChanSyncMsg()
if err != nil {
peerLog.Errorf("Unable to create channel "+
"reestablish message for channel %v: "+
"%v", chanPoint, err)
continue
}
msgs = append(msgs, chanSync)
continue
}
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil {
return nil, err
}
graph := p.server.chanDB.ChannelGraph()
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound {
return nil, err
}
var selfPolicy *channeldb.ChannelEdgePolicy
if info != nil && bytes.Equal(info.NodeKey1Bytes[:],
p.server.identityPriv.PubKey().SerializeCompressed()) {
selfPolicy = p1
} else {
selfPolicy = p2
}
var forwardingPolicy *htlcswitch.ForwardingPolicy
if selfPolicy != nil {
forwardingPolicy = &htlcswitch.ForwardingPolicy{
MinHTLCOut: selfPolicy.MinHTLC,
MaxHTLC: selfPolicy.MaxHTLC,
BaseFee: selfPolicy.FeeBaseMSat,
FeeRate: selfPolicy.FeeProportionalMillionths,
TimeLockDelta: uint32(selfPolicy.TimeLockDelta),
}
} else {
peerLog.Warnf("Unable to find our forwarding policy "+
"for channel %v, using default values",
chanPoint)
forwardingPolicy = &p.server.cc.routingPolicy
}
peerLog.Tracef("Using link policy of: %v",
spew.Sdump(forwardingPolicy))
if lnChan.IsPending() {
p.activeChanMtx.Lock()
p.activeChannels[chanID] = nil
p.activeChanMtx.Unlock()
continue
}
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
return nil, err
}
err = p.addLink(
chanPoint, lnChan, forwardingPolicy, chainEvents,
currentHeight, true,
)
if err != nil {
return nil, fmt.Errorf("unable to add link %v to "+
"switch: %v", chanPoint, err)
}
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
}
return msgs, nil
}
func (p *peer) addLink(chanPoint *wire.OutPoint,
lnChan *lnwallet.LightningChannel,
forwardingPolicy *htlcswitch.ForwardingPolicy,
chainEvents *contractcourt.ChainEventSubscription,
currentHeight int32, syncStates bool) error {
onChannelFailure := func(chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID,
linkErr htlcswitch.LinkFailureError) {
failure := linkFailureReport{
chanPoint: *chanPoint,
chanID: chanID,
shortChanID: shortChanID,
linkErr: linkErr,
}
select {
case p.linkFailures <- failure:
case <-p.quit:
case <-p.server.quit:
}
}
linkCfg := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: p.server.fetchLastChanUpdate(),
HodlMask: cfg.Hodl.Mask(),
Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
Circuits: p.server.htlcSwitch.CircuitModifier(),
ForwardPackets: p.server.htlcSwitch.ForwardPackets,
FwrdingPolicy: *forwardingPolicy,
FeeEstimator: p.server.cc.feeEstimator,
PreimageCache: p.server.witnessBeacon,
ChainEvents: chainEvents,
UpdateContractSignals: func(signals *contractcourt.ContractSignals) error {
return p.server.chainArb.UpdateContractSignals(
*chanPoint, signals,
)
},
OnChannelFailure: onChannelFailure,
SyncStates: syncStates,
BatchTicker: ticker.New(50 * time.Millisecond),
FwdPkgGCTicker: ticker.New(time.Minute),
PendingCommitTicker: ticker.New(time.Minute),
BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
OutgoingCltvRejectDelta: p.outgoingCltvRejectDelta,
TowerClient: p.server.towerClient,
MaxOutgoingCltvExpiry: cfg.MaxOutgoingCltvExpiry,
MaxFeeAllocation: cfg.MaxChannelFeeAllocation,
NotifyActiveLink: p.server.channelNotifier.NotifyActiveLinkEvent,
NotifyActiveChannel: p.server.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: p.server.channelNotifier.NotifyInactiveChannelEvent,
HtlcNotifier: p.server.htlcNotifier,
}
link := htlcswitch.NewChannelLink(linkCfg, lnChan)
p.server.htlcSwitch.RemoveLink(link.ChanID())
return p.server.htlcSwitch.AddLink(link)
}
func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
hasConfirmedPublicChan := false
for _, channel := range channels {
if channel.IsPending {
continue
}
if channel.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
continue
}
hasConfirmedPublicChan = true
break
}
if !hasConfirmedPublicChan {
return
}
ourNodeAnn, err := p.server.genNodeAnnouncement(false)
if err != nil {
srvrLog.Debugf("Unable to retrieve node announcement: %v", err)
return
}
if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil {
srvrLog.Debugf("Unable to resend node announcement to %x: %v",
p.pubKeyBytes, err)
}
}
func (p *peer) WaitForDisconnect(ready chan struct{}) {
select {
case <-ready:
case <-p.quit:
}
p.wg.Wait()
}
func (p *peer) Disconnect(reason error) {
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
return
}
err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
p.storeError(err)
peerLog.Infof(err.Error())
p.conn.Close()
close(p.quit)
}
func (p *peer) String() string {
return fmt.Sprintf("%x@%s", p.pubKeyBytes, p.conn.RemoteAddr())
}
func (p *peer) readNextMessage() (lnwire.Message, error) {
noiseConn, ok := p.conn.(*brontide.Conn)
if !ok {
return nil, fmt.Errorf("brontide.Conn required to read messages")
}
err := noiseConn.SetReadDeadline(time.Time{})
if err != nil {
return nil, err
}
pktLen, err := noiseConn.ReadNextHeader()
if err != nil {
return nil, err
}
var rawMsg []byte
err = p.readPool.Submit(func(buf *buffer.Read) error {
readDeadline := time.Now().Add(readMessageTimeout)
readErr := noiseConn.SetReadDeadline(readDeadline)
if readErr != nil {
return readErr
}
rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen])
return readErr
})
atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg)))
if err != nil {
return nil, err
}
msgReader := bytes.NewReader(rawMsg)
nextMsg, err := lnwire.ReadMessage(msgReader, 0)
if err != nil {
return nil, err
}
p.logWireMessage(nextMsg, true)
return nextMsg, nil
}
type msgStream struct {
streamShutdown int32
peer *peer
apply func(lnwire.Message)
startMsg string
stopMsg string
msgCond *sync.Cond
msgs []lnwire.Message
mtx sync.Mutex
producerSema chan struct{}
wg sync.WaitGroup
quit chan struct{}
}
func newMsgStream(p *peer, startMsg, stopMsg string, bufSize uint32,
apply func(lnwire.Message)) *msgStream {
stream := &msgStream{
peer: p,
apply: apply,
startMsg: startMsg,
stopMsg: stopMsg,
producerSema: make(chan struct{}, bufSize),
quit: make(chan struct{}),
}
stream.msgCond = sync.NewCond(&stream.mtx)
for i := uint32(0); i < bufSize; i++ {
stream.producerSema <- struct{}{}
}
return stream
}
func (ms *msgStream) Start() {
ms.wg.Add(1)
go ms.msgConsumer()
}
func (ms *msgStream) Stop() {
close(ms.quit)
for atomic.LoadInt32(&ms.streamShutdown) == 0 {
ms.msgCond.Signal()
time.Sleep(time.Millisecond * 100)
}
ms.wg.Wait()
}
func (ms *msgStream) msgConsumer() {
defer ms.wg.Done()
defer peerLog.Tracef(ms.stopMsg)
defer atomic.StoreInt32(&ms.streamShutdown, 1)
peerLog.Tracef(ms.startMsg)
for {
ms.msgCond.L.Lock()
for len(ms.msgs) == 0 {
ms.msgCond.Wait()
select {
case <-ms.peer.quit:
ms.msgCond.L.Unlock()
return
case <-ms.quit:
ms.msgCond.L.Unlock()
return
default:
}
}
msg := ms.msgs[0]
ms.msgs[0] = nil ms.msgs = ms.msgs[1:]
ms.msgCond.L.Unlock()
ms.apply(msg)
select {
case ms.producerSema <- struct{}{}:
case <-ms.peer.quit:
return
case <-ms.quit:
return
}
}
}
func (ms *msgStream) AddMsg(msg lnwire.Message) {
select {
case <-ms.producerSema:
case <-ms.peer.quit:
return
case <-ms.quit:
return
}
ms.msgCond.L.Lock()
ms.msgs = append(ms.msgs, msg)
ms.msgCond.L.Unlock()
ms.msgCond.Signal()
}
func waitUntilLinkActive(p *peer, cid lnwire.ChannelID) htlcswitch.ChannelLink {
sub, err := p.server.channelNotifier.SubscribeChannelEvents()
if err != nil {
return nil
}
defer sub.Cancel()
link, _ := p.server.htlcSwitch.GetLink(cid)
if link != nil {
return link
}
for {
select {
case e := <-sub.Updates():
event, ok := e.(channelnotifier.ActiveLinkEvent)
if !ok {
continue
}
chanPoint := event.ChannelPoint
if !cid.IsChanPoint(chanPoint) {
continue
}
link, _ = p.server.htlcSwitch.GetLink(cid)
return link
case <-p.quit:
return nil
}
}
}
func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
var chanLink htlcswitch.ChannelLink
return newMsgStream(p,
fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]),
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
1000,
func(msg lnwire.Message) {
if chanLink == nil {
chanLink = waitUntilLinkActive(p, cid)
if chanLink == nil {
return
}
}
select {
case <-p.quit:
return
default:
}
chanLink.HandleChannelUpdate(msg)
},
)
}
func newDiscMsgStream(p *peer) *msgStream {
return newMsgStream(p,
"Update stream for gossiper created",
"Update stream for gossiper exited",
1000,
func(msg lnwire.Message) {
p.server.authGossiper.ProcessRemoteAnnouncement(msg, p)
},
)
}
func (p *peer) readHandler() {
defer p.wg.Done()
idleTimer := time.AfterFunc(idleTimeout, func() {
err := fmt.Errorf("peer %s no answer for %s -- disconnecting",
p, idleTimeout)
p.Disconnect(err)
})
p.initGossipSync()
discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, err := p.readNextMessage()
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
if err != nil {
peerLog.Infof("unable to read message from %v: %v",
p, err)
switch e := err.(type) {
case *lnwire.UnknownMessage:
p.storeError(e)
idleTimer.Reset(idleTimeout)
continue
case *lnwire.ErrUnknownAddrType:
p.storeError(e)
idleTimer.Reset(idleTimeout)
continue
case *lnwire.ErrInvalidNodeAlias:
idleTimer.Reset(idleTimeout)
continue
default:
break out
}
}
var (
targetChan lnwire.ChannelID
isLinkUpdate bool
)
switch msg := nextMsg.(type) {
case *lnwire.Pong:
pingSendTime := atomic.LoadInt64(&p.pingLastSend)
delay := (time.Now().UnixNano() - pingSendTime) / 1000
atomic.StoreInt64(&p.pingTime, delay)
case *lnwire.Ping:
pongBytes := make([]byte, msg.NumPongBytes)
p.queueMsg(lnwire.NewPong(pongBytes), nil)
case *lnwire.OpenChannel:
p.server.fundingMgr.processFundingOpen(msg, p)
case *lnwire.AcceptChannel:
p.server.fundingMgr.processFundingAccept(msg, p)
case *lnwire.FundingCreated:
p.server.fundingMgr.processFundingCreated(msg, p)
case *lnwire.FundingSigned:
p.server.fundingMgr.processFundingSigned(msg, p)
case *lnwire.FundingLocked:
p.server.fundingMgr.processFundingLocked(msg, p)
case *lnwire.Shutdown:
select {
case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
case <-p.quit:
break out
}
case *lnwire.ClosingSigned:
select {
case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
case <-p.quit:
break out
}
case *lnwire.Error:
targetChan = msg.ChanID
isLinkUpdate = p.handleError(msg)
case *lnwire.ChannelReestablish:
targetChan = msg.ChanID
isLinkUpdate = p.isActiveChannel(targetChan)
if !isLinkUpdate {
err := p.resendChanSyncMsg(targetChan)
if err != nil {
peerLog.Errorf("resend failed: %v",
err)
}
}
case LinkUpdater:
targetChan = msg.TargetChanID()
isLinkUpdate = p.isActiveChannel(targetChan)
case *lnwire.ChannelUpdate,
*lnwire.ChannelAnnouncement,
*lnwire.NodeAnnouncement,
*lnwire.AnnounceSignatures,
*lnwire.GossipTimestampRange,
*lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
discStream.AddMsg(msg)
default:
err := fmt.Errorf("unknown message type %v received",
uint16(msg.MsgType()))
p.storeError(err)
peerLog.Errorf("peer: %v, %v", p, err)
}
if isLinkUpdate {
chanStream, ok := p.activeMsgStreams[targetChan]
if !ok {
chanStream = newChanMsgStream(p, targetChan)
p.activeMsgStreams[targetChan] = chanStream
chanStream.Start()
defer chanStream.Stop()
}
chanStream.AddMsg(nextMsg)
}
idleTimer.Reset(idleTimeout)
}
p.Disconnect(errors.New("read handler closed"))
peerLog.Tracef("readHandler for peer %v done", p)
}
func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
p.activeChanMtx.RLock()
_, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
return ok
}
func (p *peer) storeError(err error) {
var haveChannels bool
p.activeChanMtx.RLock()
for _, channel := range p.activeChannels {
if channel == nil {
continue
}
haveChannels = true
break
}
p.activeChanMtx.RUnlock()
if !haveChannels {
peerLog.Tracef("no channels with peer: %v, not storing err", p)
return
}
p.errorBuffer.Add(
×tampedError{timestamp: time.Now(), error: err},
)
}
func (p *peer) handleError(msg *lnwire.Error) bool {
key := p.addr.IdentityKey
p.storeError(msg)
switch {
case msg.ChanID == lnwire.ConnectionWideID:
for _, chanStream := range p.activeMsgStreams {
chanStream.AddMsg(msg)
}
return false
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
p.server.fundingMgr.processFundingError(msg, key)
return false
case p.isActiveChannel(msg.ChanID):
return true
default:
return false
}
}
func messageSummary(msg lnwire.Message) string {
switch msg := msg.(type) {
case *lnwire.Init:
return ""
case *lnwire.OpenChannel:
return fmt.Sprintf("temp_chan_id=%x, chain=%v, csv=%v, amt=%v, "+
"push_amt=%v, reserve=%v, flags=%v",
msg.PendingChannelID[:], msg.ChainHash,
msg.CsvDelay, msg.FundingAmount, msg.PushAmount,
msg.ChannelReserve, msg.ChannelFlags)
case *lnwire.AcceptChannel:
return fmt.Sprintf("temp_chan_id=%x, reserve=%v, csv=%v, num_confs=%v",
msg.PendingChannelID[:], msg.ChannelReserve, msg.CsvDelay,
msg.MinAcceptDepth)
case *lnwire.FundingCreated:
return fmt.Sprintf("temp_chan_id=%x, chan_point=%v",
msg.PendingChannelID[:], msg.FundingPoint)
case *lnwire.FundingSigned:
return fmt.Sprintf("chan_id=%v", msg.ChanID)
case *lnwire.FundingLocked:
return fmt.Sprintf("chan_id=%v, next_point=%x",
msg.ChanID, msg.NextPerCommitmentPoint.SerializeCompressed())
case *lnwire.Shutdown:
return fmt.Sprintf("chan_id=%v, script=%x", msg.ChannelID,
msg.Address[:])
case *lnwire.ClosingSigned:
return fmt.Sprintf("chan_id=%v, fee_sat=%v", msg.ChannelID,
msg.FeeSatoshis)
case *lnwire.UpdateAddHTLC:
return fmt.Sprintf("chan_id=%v, id=%v, amt=%v, expiry=%v, hash=%x",
msg.ChanID, msg.ID, msg.Amount, msg.Expiry, msg.PaymentHash[:])
case *lnwire.UpdateFailHTLC:
return fmt.Sprintf("chan_id=%v, id=%v, reason=%x", msg.ChanID,
msg.ID, msg.Reason)
case *lnwire.UpdateFulfillHTLC:
return fmt.Sprintf("chan_id=%v, id=%v, pre_image=%x",
msg.ChanID, msg.ID, msg.PaymentPreimage[:])
case *lnwire.CommitSig:
return fmt.Sprintf("chan_id=%v, num_htlcs=%v", msg.ChanID,
len(msg.HtlcSigs))
case *lnwire.RevokeAndAck:
return fmt.Sprintf("chan_id=%v, rev=%x, next_point=%x",
msg.ChanID, msg.Revocation[:],
msg.NextRevocationKey.SerializeCompressed())
case *lnwire.UpdateFailMalformedHTLC:
return fmt.Sprintf("chan_id=%v, id=%v, fail_code=%v",
msg.ChanID, msg.ID, msg.FailureCode)
case *lnwire.Error:
return fmt.Sprintf("%v", msg.Error())
case *lnwire.AnnounceSignatures:
return fmt.Sprintf("chan_id=%v, short_chan_id=%v", msg.ChannelID,
msg.ShortChannelID.ToUint64())
case *lnwire.ChannelAnnouncement:
return fmt.Sprintf("chain_hash=%v, short_chan_id=%v",
msg.ChainHash, msg.ShortChannelID.ToUint64())
case *lnwire.ChannelUpdate:
return fmt.Sprintf("chain_hash=%v, short_chan_id=%v, "+
"mflags=%v, cflags=%v, update_time=%v", msg.ChainHash,
msg.ShortChannelID.ToUint64(), msg.MessageFlags,
msg.ChannelFlags, time.Unix(int64(msg.Timestamp), 0))
case *lnwire.NodeAnnouncement:
return fmt.Sprintf("node=%x, update_time=%v",
msg.NodeID, time.Unix(int64(msg.Timestamp), 0))
case *lnwire.Ping:
return ""
case *lnwire.Pong:
return ""
case *lnwire.UpdateFee:
return fmt.Sprintf("chan_id=%v, fee_update_sat=%v",
msg.ChanID, int64(msg.FeePerKw))
case *lnwire.ChannelReestablish:
return fmt.Sprintf("next_local_height=%v, remote_tail_height=%v",
msg.NextLocalCommitHeight, msg.RemoteCommitTailHeight)
case *lnwire.ReplyShortChanIDsEnd:
return fmt.Sprintf("chain_hash=%v, complete=%v", msg.ChainHash,
msg.Complete)
case *lnwire.ReplyChannelRange:
return fmt.Sprintf("start_height=%v, end_height=%v, "+
"num_chans=%v, encoding=%v", msg.FirstBlockHeight,
msg.LastBlockHeight(), len(msg.ShortChanIDs),
msg.EncodingType)
case *lnwire.QueryShortChanIDs:
return fmt.Sprintf("chain_hash=%v, encoding=%v, num_chans=%v",
msg.ChainHash, msg.EncodingType, len(msg.ShortChanIDs))
case *lnwire.QueryChannelRange:
return fmt.Sprintf("chain_hash=%v, start_height=%v, "+
"end_height=%v", msg.ChainHash, msg.FirstBlockHeight,
msg.LastBlockHeight())
case *lnwire.GossipTimestampRange:
return fmt.Sprintf("chain_hash=%v, first_stamp=%v, "+
"stamp_range=%v", msg.ChainHash,
time.Unix(int64(msg.FirstTimestamp), 0),
msg.TimestampRange)
}
return ""
}
func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
summaryPrefix := "Received"
if !read {
summaryPrefix = "Sending"
}
peerLog.Debugf("%v", newLogClosure(func() string {
summary := messageSummary(msg)
if len(summary) > 0 {
summary = "(" + summary + ")"
}
preposition := "to"
if read {
preposition = "from"
}
return fmt.Sprintf("%v %v%s %v %s", summaryPrefix,
msg.MsgType(), summary, preposition, p)
}))
switch m := msg.(type) {
case *lnwire.ChannelReestablish:
if m.LocalUnrevokedCommitPoint != nil {
m.LocalUnrevokedCommitPoint.Curve = nil
}
case *lnwire.RevokeAndAck:
m.NextRevocationKey.Curve = nil
case *lnwire.AcceptChannel:
m.FundingKey.Curve = nil
m.RevocationPoint.Curve = nil
m.PaymentPoint.Curve = nil
m.DelayedPaymentPoint.Curve = nil
m.HtlcPoint.Curve = nil
m.FirstCommitmentPoint.Curve = nil
case *lnwire.OpenChannel:
m.FundingKey.Curve = nil
m.RevocationPoint.Curve = nil
m.PaymentPoint.Curve = nil
m.DelayedPaymentPoint.Curve = nil
m.HtlcPoint.Curve = nil
m.FirstCommitmentPoint.Curve = nil
case *lnwire.FundingLocked:
m.NextPerCommitmentPoint.Curve = nil
}
prefix := "readMessage from"
if !read {
prefix = "writeMessage to"
}
peerLog.Tracef(prefix+" %v: %v", p, newLogClosure(func() string {
return spew.Sdump(msg)
}))
}
func (p *peer) writeMessage(msg lnwire.Message) error {
if atomic.LoadInt32(&p.disconnect) != 0 {
return lnpeer.ErrPeerExiting
}
if msg != nil {
p.logWireMessage(msg, false)
}
noiseConn, ok := p.conn.(*brontide.Conn)
if !ok {
return fmt.Errorf("brontide.Conn required to write messages")
}
flushMsg := func() error {
writeDeadline := time.Now().Add(writeMessageTimeout)
err := noiseConn.SetWriteDeadline(writeDeadline)
if err != nil {
return err
}
n, err := noiseConn.Flush()
if n > 0 {
atomic.AddUint64(&p.bytesSent, uint64(n))
}
return err
}
if msg == nil {
return flushMsg()
}
err := p.writePool.Submit(func(buf *bytes.Buffer) error {
_, writeErr := lnwire.WriteMessage(buf, msg, 0)
if writeErr != nil {
return writeErr
}
return noiseConn.WriteMessage(buf.Bytes())
})
if err != nil {
return err
}
return flushMsg()
}
func (p *peer) writeHandler() {
idleTimer := time.AfterFunc(idleTimeout, func() {
err := fmt.Errorf("peer %s no write for %s -- disconnecting",
p, idleTimeout)
p.Disconnect(err)
})
var exitErr error
out:
for {
select {
case outMsg := <-p.sendQueue:
if _, ok := outMsg.msg.(*lnwire.Ping); ok {
now := time.Now().UnixNano()
atomic.StoreInt64(&p.pingLastSend, now)
}
startTime := time.Now()
retry:
err := p.writeMessage(outMsg.msg)
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
peerLog.Debugf("Write timeout detected for "+
"peer %s, first write for message "+
"attempted %v ago", p,
time.Since(startTime))
outMsg.msg = nil
goto retry
}
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
idleTimer.Reset(idleTimeout)
if outMsg.errChan != nil {
outMsg.errChan <- err
}
if err != nil {
exitErr = fmt.Errorf("unable to write "+
"message: %v", err)
break out
}
case <-p.quit:
exitErr = lnpeer.ErrPeerExiting
break out
}
}
p.wg.Done()
p.Disconnect(exitErr)
peerLog.Tracef("writeHandler for peer %v done", p)
}
func (p *peer) queueHandler() {
defer p.wg.Done()
priorityMsgs := list.New()
lazyMsgs := list.New()
for {
elem := priorityMsgs.Front()
if elem == nil {
elem = lazyMsgs.Front()
}
if elem != nil {
front := elem.Value.(outgoingMsg)
select {
case p.sendQueue <- front:
if front.priority {
priorityMsgs.Remove(elem)
} else {
lazyMsgs.Remove(elem)
}
case msg := <-p.outgoingQueue:
if msg.priority {
priorityMsgs.PushBack(msg)
} else {
lazyMsgs.PushBack(msg)
}
case <-p.quit:
return
}
} else {
select {
case msg := <-p.outgoingQueue:
if msg.priority {
priorityMsgs.PushBack(msg)
} else {
lazyMsgs.PushBack(msg)
}
case <-p.quit:
return
}
}
}
}
func (p *peer) pingHandler() {
defer p.wg.Done()
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()
const numPingBytes = 16
out:
for {
select {
case <-pingTicker.C:
p.queueMsg(lnwire.NewPing(numPingBytes), nil)
case <-p.quit:
break out
}
}
}
func (p *peer) PingTime() int64 {
return atomic.LoadInt64(&p.pingTime)
}
func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) {
p.queue(true, msg, errChan)
}
func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) {
p.queue(false, msg, errChan)
}
func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) {
select {
case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
case <-p.quit:
peerLog.Tracef("Peer shutting down, could not enqueue msg.")
if errChan != nil {
errChan <- lnpeer.ErrPeerExiting
}
}
}
func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot {
p.activeChanMtx.RLock()
defer p.activeChanMtx.RUnlock()
snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels))
for _, activeChan := range p.activeChannels {
if activeChan == nil {
continue
}
if activeChan.RemoteNextRevocation() == nil {
continue
}
snapshot := activeChan.StateSnapshot()
snapshots = append(snapshots, snapshot)
}
return snapshots
}
func (p *peer) genDeliveryScript() ([]byte, error) {
deliveryAddr, err := p.server.cc.wallet.NewAddress(
lnwallet.WitnessPubKey, false,
)
if err != nil {
return nil, err
}
peerLog.Infof("Delivery addr for channel close: %v",
deliveryAddr)
return txscript.PayToAddrScript(deliveryAddr)
}
func (p *peer) channelManager() {
defer p.wg.Done()
reenableTimeout := time.After(p.chanActiveTimeout)
out:
for {
select {
case newChanReq := <-p.newChannels:
newChan := newChanReq.channel
chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock()
currentChan, ok := p.activeChannels[chanID]
if ok && currentChan != nil {
peerLog.Infof("Already have ChannelPoint(%v), "+
"ignoring.", chanPoint)
p.activeChanMtx.Unlock()
close(newChanReq.err)
if currentChan.RemoteNextRevocation() != nil {
continue
}
peerLog.Infof("Processing retransmitted "+
"FundingLocked for ChannelPoint(%v)",
chanPoint)
nextRevoke := newChan.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke)
if err != nil {
peerLog.Errorf("unable to init chan "+
"revocation: %v", err)
continue
}
continue
}
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, newChan, p.server.sigPool,
)
if err != nil {
p.activeChanMtx.Unlock()
err := fmt.Errorf("unable to create "+
"LightningChannel: %v", err)
peerLog.Errorf(err.Error())
newChanReq.err <- err
continue
}
p.activeChannels[chanID] = lnChan
p.addedChannels[chanID] = struct{}{}
p.activeChanMtx.Unlock()
peerLog.Infof("New channel active ChannelPoint(%v) "+
"with NodeKey(%x)", chanPoint, p.PubKey())
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil {
err := fmt.Errorf("unable to get best "+
"block: %v", err)
peerLog.Errorf(err.Error())
newChanReq.err <- err
continue
}
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
err := fmt.Errorf("unable to subscribe to "+
"chain events: %v", err)
peerLog.Errorf(err.Error())
newChanReq.err <- err
continue
}
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.server.cc.routingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: newChan.LocalChanCfg.MaxPendingAmount,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
}
shouldReestablish := ok
err = p.addLink(
chanPoint, lnChan, forwardingPolicy,
chainEvents, currentHeight, shouldReestablish,
)
if err != nil {
err := fmt.Errorf("can't register new channel "+
"link(%v) with NodeKey(%x)", chanPoint,
p.PubKey())
peerLog.Errorf(err.Error())
newChanReq.err <- err
continue
}
close(newChanReq.err)
case req := <-p.localCloseChanReqs:
p.handleLocalCloseReq(req)
case failure := <-p.linkFailures:
p.handleLinkFailure(failure)
case closeMsg := <-p.chanCloseMsgs:
chanCloser, err := p.fetchActiveChanCloser(closeMsg.cid)
if err != nil {
if err == ErrChannelNotFound {
continue
}
peerLog.Errorf("Unable to respond to remote "+
"close msg: %v", err)
errMsg := &lnwire.Error{
ChanID: closeMsg.cid,
Data: lnwire.ErrorData(err.Error()),
}
p.queueMsg(errMsg, nil)
continue
}
msgs, closeFin, err := chanCloser.ProcessCloseMsg(
closeMsg.msg,
)
if err != nil {
err := fmt.Errorf("unable to process close "+
"msg: %v", err)
peerLog.Error(err)
chanCloser.cfg.channel.ResetState()
if chanCloser.CloseRequest() != nil {
chanCloser.CloseRequest().Err <- err
}
delete(p.activeChanCloses, closeMsg.cid)
continue
}
for _, msg := range msgs {
p.queueMsg(msg, nil)
}
if !closeFin {
continue
}
p.finalizeChanClosure(chanCloser)
case <-reenableTimeout:
p.reenableActiveChannels()
reenableTimeout = nil
case <-p.quit:
p.activeChanMtx.Lock()
for _, channel := range p.activeChannels {
if channel == nil {
continue
}
channel.ResetState()
}
p.activeChanMtx.Unlock()
break out
}
}
}
func (p *peer) reenableActiveChannels() {
var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock()
for chanID, lnChan := range p.activeChannels {
if lnChan == nil {
continue
}
dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending {
continue
}
if _, ok := p.addedChannels[chanID]; ok {
continue
}
activePublicChans = append(
activePublicChans, dbChan.FundingOutpoint,
)
}
p.activeChanMtx.RUnlock()
for _, chanPoint := range activePublicChans {
err := p.server.chanStatusMgr.RequestEnable(chanPoint)
if err != nil {
srvrLog.Errorf("Unable to enable channel %v: %v",
chanPoint, err)
}
}
}
func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, error) {
p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
if !ok || channel == nil {
return nil, ErrChannelNotFound
}
chanCloser, ok := p.activeChanCloses[chanID]
if !ok {
if len(channel.ActiveHtlcs()) != 0 {
return nil, fmt.Errorf("cannot co-op close " +
"channel w/ active htlcs")
}
deliveryScript := channel.LocalUpfrontShutdownScript()
if len(deliveryScript) == 0 {
var err error
deliveryScript, err = p.genDeliveryScript()
if err != nil {
peerLog.Errorf("unable to gen delivery script: %v", err)
return nil, fmt.Errorf("close addr unavailable")
}
}
feePerKw, err := p.server.cc.feeEstimator.EstimateFeePerKW(6)
if err != nil {
peerLog.Errorf("unable to query fee estimator: %v", err)
return nil, fmt.Errorf("unable to estimate fee")
}
_, startingHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil {
peerLog.Errorf("unable to obtain best block: %v", err)
return nil, fmt.Errorf("cannot obtain best block")
}
chanCloser = newChannelCloser(
chanCloseCfg{
channel: channel,
unregisterChannel: p.server.htlcSwitch.RemoveLink,
broadcastTx: p.server.cc.wallet.PublishTransaction,
disableChannel: p.server.chanStatusMgr.RequestDisable,
disconnect: func() error {
return p.server.DisconnectPeer(p.IdentityKey())
},
quit: p.quit,
},
deliveryScript,
feePerKw,
uint32(startingHeight),
nil,
false,
)
p.activeChanCloses[chanID] = chanCloser
}
return chanCloser, nil
}
func chooseDeliveryScript(upfront,
requested lnwire.DeliveryAddress) (lnwire.DeliveryAddress, error) {
if len(upfront) == 0 {
return requested, nil
}
if len(requested) == 0 {
return upfront, nil
}
if !bytes.Equal(upfront, requested) {
return nil, errUpfrontShutdownScriptMismatch
}
return upfront, nil
}
func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
if !ok || channel == nil {
err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+
"unknown", chanID)
peerLog.Errorf(err.Error())
req.Err <- err
return
}
switch req.CloseType {
case htlcswitch.CloseRegular:
deliveryScript, err := chooseDeliveryScript(
channel.LocalUpfrontShutdownScript(), req.DeliveryScript,
)
if err != nil {
peerLog.Errorf("cannot close channel %v: %v", req.ChanPoint, err)
req.Err <- err
return
}
if len(deliveryScript) == 0 {
deliveryScript, err = p.genDeliveryScript()
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
return
}
}
_, startingHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
return
}
chanCloser := newChannelCloser(
chanCloseCfg{
channel: channel,
unregisterChannel: p.server.htlcSwitch.RemoveLink,
broadcastTx: p.server.cc.wallet.PublishTransaction,
disableChannel: p.server.chanStatusMgr.RequestDisable,
disconnect: func() error {
return p.server.DisconnectPeer(p.IdentityKey())
},
quit: p.quit,
},
deliveryScript,
req.TargetFeePerKw,
uint32(startingHeight),
req,
true,
)
p.activeChanCloses[chanID] = chanCloser
shutdownMsg, err := chanCloser.ShutdownChan()
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
delete(p.activeChanCloses, chanID)
channel.ResetState()
return
}
p.queueMsg(shutdownMsg, nil)
case htlcswitch.CloseBreach:
peerLog.Infof("ChannelPoint(%v) has been breached, wiping "+
"channel", req.ChanPoint)
p.WipeChannel(req.ChanPoint)
}
}
type linkFailureReport struct {
chanPoint wire.OutPoint
chanID lnwire.ChannelID
shortChanID lnwire.ShortChannelID
linkErr htlcswitch.LinkFailureError
}
func (p *peer) handleLinkFailure(failure linkFailureReport) {
p.WipeChannel(&failure.chanPoint)
if failure.linkErr.ForceClose {
peerLog.Warnf("Force closing link(%v)",
failure.shortChanID)
closeTx, err := p.server.chainArb.ForceCloseContract(
failure.chanPoint,
)
if err != nil {
peerLog.Errorf("unable to force close "+
"link(%v): %v", failure.shortChanID, err)
} else {
peerLog.Infof("channel(%v) force "+
"closed with txid %v",
failure.shortChanID, closeTx.TxHash())
}
}
if failure.linkErr.ShouldSendToPeer() {
data := []byte(failure.linkErr.Error())
if failure.linkErr.SendData != nil {
data = failure.linkErr.SendData
}
err := p.SendMessage(true, &lnwire.Error{
ChanID: failure.chanID,
Data: data,
})
if err != nil {
peerLog.Errorf("unable to send msg to "+
"remote peer: %v", err)
}
}
}
func (p *peer) finalizeChanClosure(chanCloser *channelCloser) {
closeReq := chanCloser.CloseRequest()
chanPoint := chanCloser.cfg.channel.ChannelPoint()
p.WipeChannel(chanPoint)
notifier := p.server.cc.chainNotifier
errChan := make(chan error, 1)
if closeReq != nil {
errChan = closeReq.Err
}
closingTx, err := chanCloser.ClosingTx()
if err != nil {
if closeReq != nil {
peerLog.Error(err)
closeReq.Err <- err
}
}
closingTxid := closingTx.TxHash()
if closeReq != nil {
closeReq.Updates <- &pendingUpdate{
Txid: closingTxid[:],
}
}
go waitForChanToClose(chanCloser.negotiationHeight, notifier, errChan,
chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() {
if closeReq != nil {
closeReq.Updates <- &channelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
}
}
})
}
func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
errChan chan error, chanPoint *wire.OutPoint,
closingTxID *chainhash.Hash, closeScript []byte, cb func()) {
peerLog.Infof("Waiting for confirmation of cooperative close of "+
"ChannelPoint(%v) with txid: %v", chanPoint,
closingTxID)
confNtfn, err := notifier.RegisterConfirmationsNtfn(
closingTxID, closeScript, 1, bestHeight,
)
if err != nil {
if errChan != nil {
errChan <- err
}
return
}
height, ok := <-confNtfn.Confirmed
if !ok {
return
}
peerLog.Infof("ChannelPoint(%v) is now closed at "+
"height %v", chanPoint, height.BlockHeight)
cb()
}
func (p *peer) WipeChannel(chanPoint *wire.OutPoint) {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock()
delete(p.activeChannels, chanID)
p.activeChanMtx.Unlock()
p.server.htlcSwitch.RemoveLink(chanID)
}
func (p *peer) handleInitMsg(msg *lnwire.Init) error {
err := msg.Features.Merge(msg.GlobalFeatures)
if err != nil {
return fmt.Errorf("unable to merge legacy global features: %v",
err)
}
p.remoteFeatures = lnwire.NewFeatureVector(
msg.Features, lnwire.Features,
)
err = feature.ValidateRequired(p.remoteFeatures)
if err != nil {
return fmt.Errorf("invalid remote features: %v", err)
}
err = feature.ValidateDeps(p.remoteFeatures)
if err != nil {
return fmt.Errorf("invalid remote features: %v", err)
}
switch {
case !p.remoteFeatures.HasFeature(lnwire.DataLossProtectRequired):
return fmt.Errorf("data loss protection required")
}
return nil
}
func (p *peer) LocalFeatures() *lnwire.FeatureVector {
return p.features
}
func (p *peer) RemoteFeatures() *lnwire.FeatureVector {
return p.remoteFeatures
}
func (p *peer) sendInitMsg() error {
msg := lnwire.NewInitMessage(
p.legacyFeatures.RawFeatureVector,
p.features.RawFeatureVector,
)
return p.writeMessage(msg)
}
func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error {
if _, ok := p.resentChanSyncMsg[cid]; ok {
return nil
}
c, err := p.server.chanDB.FetchClosedChannelForID(cid)
if err != nil {
return fmt.Errorf("unable to fetch channel sync messages for "+
"peer %v: %v", p, err)
}
if c.LastChanSyncMsg == nil {
return fmt.Errorf("no chan sync message stored for channel %v",
cid)
}
if !c.RemotePub.IsEqual(p.IdentityKey()) {
return fmt.Errorf("ignoring channel reestablish from "+
"peer=%x", p.IdentityKey())
}
peerLog.Debugf("Re-sending channel sync message for channel %v to "+
"peer %v", cid, p)
if err := p.SendMessage(true, c.LastChanSyncMsg); err != nil {
return fmt.Errorf("failed resending channel sync "+
"message to peer %v: %v", p, err)
}
peerLog.Debugf("Re-sent channel sync message for channel %v to peer "+
"%v", cid, p)
p.resentChanSyncMsg[cid] = struct{}{}
return nil
}
func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
return p.sendMessage(sync, true, msgs...)
}
func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
return p.sendMessage(sync, false, msgs...)
}
func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
var errChans []chan error
if sync {
errChans = make([]chan error, 0, len(msgs))
}
for _, msg := range msgs {
var errChan chan error
if sync {
errChan = make(chan error, 1)
errChans = append(errChans, errChan)
}
if priority {
p.queueMsg(msg, errChan)
} else {
p.queueMsgLazy(msg, errChan)
}
}
for _, errChan := range errChans {
select {
case err := <-errChan:
return err
case <-p.quit:
return lnpeer.ErrPeerExiting
case <-p.server.quit:
return lnpeer.ErrPeerExiting
}
}
return nil
}
func (p *peer) PubKey() [33]byte {
return p.pubKeyBytes
}
func (p *peer) IdentityKey() *btcec.PublicKey {
return p.addr.IdentityKey
}
func (p *peer) Address() net.Addr {
return p.addr.Address
}
func (p *peer) AddNewChannel(channel *channeldb.OpenChannel,
cancel <-chan struct{}) error {
errChan := make(chan error, 1)
newChanMsg := &newChannelMsg{
channel: channel,
err: errChan,
}
select {
case p.newChannels <- newChanMsg:
case <-cancel:
return errors.New("canceled adding new channel")
case <-p.quit:
return lnpeer.ErrPeerExiting
}
select {
case err := <-errChan:
return err
case <-p.quit:
return lnpeer.ErrPeerExiting
}
}
func (p *peer) StartTime() time.Time {
return p.startTime
}
type LinkUpdater interface {
TargetChanID() lnwire.ChannelID
}