package lnd
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/psbt"
"github.com/btcsuite/btcwallet/wallet/txauthor"
"github.com/davecgh/go-spew/spew"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/chanfitness"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/record"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/sweep"
"github.com/lightningnetwork/lnd/watchtower"
"github.com/lightningnetwork/lnd/zpay32"
"github.com/tv42/zbase32"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
const (
maxBtcPaymentMSat = lnwire.MilliSatoshi(math.MaxUint32)
maxLtcPaymentMSat = lnwire.MilliSatoshi(math.MaxUint32) *
btcToLtcConversionRate
)
var (
MaxPaymentMSat = maxBtcPaymentMSat
readPermissions = []bakery.Op{
{
Entity: "onchain",
Action: "read",
},
{
Entity: "offchain",
Action: "read",
},
{
Entity: "address",
Action: "read",
},
{
Entity: "message",
Action: "read",
},
{
Entity: "peers",
Action: "read",
},
{
Entity: "info",
Action: "read",
},
{
Entity: "invoices",
Action: "read",
},
{
Entity: "signer",
Action: "read",
},
}
writePermissions = []bakery.Op{
{
Entity: "onchain",
Action: "write",
},
{
Entity: "offchain",
Action: "write",
},
{
Entity: "address",
Action: "write",
},
{
Entity: "message",
Action: "write",
},
{
Entity: "peers",
Action: "write",
},
{
Entity: "info",
Action: "write",
},
{
Entity: "invoices",
Action: "write",
},
{
Entity: "signer",
Action: "generate",
},
{
Entity: "macaroon",
Action: "generate",
},
}
invoicePermissions = []bakery.Op{
{
Entity: "invoices",
Action: "read",
},
{
Entity: "invoices",
Action: "write",
},
{
Entity: "address",
Action: "read",
},
{
Entity: "address",
Action: "write",
},
{
Entity: "onchain",
Action: "read",
},
}
validActions = []string{"read", "write", "generate"}
validEntities = []string{
"onchain", "offchain", "address", "message",
"peers", "info", "invoices", "signer", "macaroon",
}
)
func stringInSlice(a string, slice []string) bool {
for _, b := range slice {
if b == a {
return true
}
}
return false
}
func mainRPCServerPermissions() map[string][]bakery.Op {
return map[string][]bakery.Op{
"/lnrpc.Lightning/SendCoins": {{
Entity: "onchain",
Action: "write",
}},
"/lnrpc.Lightning/ListUnspent": {{
Entity: "onchain",
Action: "read",
}},
"/lnrpc.Lightning/SendMany": {{
Entity: "onchain",
Action: "write",
}},
"/lnrpc.Lightning/NewAddress": {{
Entity: "address",
Action: "write",
}},
"/lnrpc.Lightning/SignMessage": {{
Entity: "message",
Action: "write",
}},
"/lnrpc.Lightning/VerifyMessage": {{
Entity: "message",
Action: "read",
}},
"/lnrpc.Lightning/ConnectPeer": {{
Entity: "peers",
Action: "write",
}},
"/lnrpc.Lightning/DisconnectPeer": {{
Entity: "peers",
Action: "write",
}},
"/lnrpc.Lightning/OpenChannel": {{
Entity: "onchain",
Action: "write",
}, {
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/OpenChannelSync": {{
Entity: "onchain",
Action: "write",
}, {
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/CloseChannel": {{
Entity: "onchain",
Action: "write",
}, {
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/AbandonChannel": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/GetInfo": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/ListPeers": {{
Entity: "peers",
Action: "read",
}},
"/lnrpc.Lightning/WalletBalance": {{
Entity: "onchain",
Action: "read",
}},
"/lnrpc.Lightning/EstimateFee": {{
Entity: "onchain",
Action: "read",
}},
"/lnrpc.Lightning/ChannelBalance": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/PendingChannels": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/ListChannels": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/SubscribeChannelEvents": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/ClosedChannels": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/SendPayment": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/SendPaymentSync": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/SendToRoute": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/SendToRouteSync": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/AddInvoice": {{
Entity: "invoices",
Action: "write",
}},
"/lnrpc.Lightning/LookupInvoice": {{
Entity: "invoices",
Action: "read",
}},
"/lnrpc.Lightning/ListInvoices": {{
Entity: "invoices",
Action: "read",
}},
"/lnrpc.Lightning/SubscribeInvoices": {{
Entity: "invoices",
Action: "read",
}},
"/lnrpc.Lightning/SubscribeTransactions": {{
Entity: "onchain",
Action: "read",
}},
"/lnrpc.Lightning/GetTransactions": {{
Entity: "onchain",
Action: "read",
}},
"/lnrpc.Lightning/DescribeGraph": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/GetNodeMetrics": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/GetChanInfo": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/GetNodeInfo": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/QueryRoutes": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/GetNetworkInfo": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/StopDaemon": {{
Entity: "info",
Action: "write",
}},
"/lnrpc.Lightning/SubscribeChannelGraph": {{
Entity: "info",
Action: "read",
}},
"/lnrpc.Lightning/ListPayments": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/DeleteAllPayments": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/DebugLevel": {{
Entity: "info",
Action: "write",
}},
"/lnrpc.Lightning/DecodePayReq": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/FeeReport": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/UpdateChannelPolicy": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/ForwardingHistory": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/RestoreChannelBackups": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/ExportChannelBackup": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/VerifyChanBackup": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/ExportAllChannelBackups": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/SubscribeChannelBackups": {{
Entity: "offchain",
Action: "read",
}},
"/lnrpc.Lightning/ChannelAcceptor": {{
Entity: "onchain",
Action: "write",
}, {
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/BakeMacaroon": {{
Entity: "macaroon",
Action: "generate",
}},
"/lnrpc.Lightning/SubscribePeerEvents": {{
Entity: "peers",
Action: "read",
}},
"/lnrpc.Lightning/FundingStateStep": {{
Entity: "onchain",
Action: "write",
}, {
Entity: "offchain",
Action: "write",
}},
}
}
type rpcServer struct {
started int32 shutdown int32
server *server
subServers []lnrpc.SubServer
grpcServer *grpc.Server
listeners []*ListenerWithSignal
listenerCleanUp []func()
restDialOpts []grpc.DialOption
restProxyDest string
tlsCfg *tls.Config
routerBackend *routerrpc.RouterBackend
chanPredicate *chanacceptor.ChainedAcceptor
quit chan struct{}
macService *macaroons.Service
selfNode route.Vertex
}
var _ lnrpc.LightningServer = (*rpcServer)(nil)
func newRPCServer(s *server, macService *macaroons.Service,
subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption,
restDialOpts []grpc.DialOption, restProxyDest string,
atpl *autopilot.Manager, invoiceRegistry *invoices.InvoiceRegistry,
tower *watchtower.Standalone, tlsCfg *tls.Config,
getListeners rpcListeners,
chanPredicate *chanacceptor.ChainedAcceptor) (*rpcServer, error) {
channelGraph := s.chanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
}
graph := s.chanDB.ChannelGraph()
routerBackend := &routerrpc.RouterBackend{
MaxPaymentMSat: MaxPaymentMSat,
SelfNode: selfNode.PubKeyBytes,
FetchChannelCapacity: func(chanID uint64) (btcutil.Amount,
error) {
info, _, _, err := graph.FetchChannelEdgesByID(chanID)
if err != nil {
return 0, err
}
return info.Capacity, nil
},
FetchChannelEndpoints: func(chanID uint64) (route.Vertex,
route.Vertex, error) {
info, _, _, err := graph.FetchChannelEdgesByID(
chanID,
)
if err != nil {
return route.Vertex{}, route.Vertex{},
fmt.Errorf("unable to fetch channel "+
"edges by channel ID %d: %v",
chanID, err)
}
return info.NodeKey1Bytes, info.NodeKey2Bytes, nil
},
FindRoute: s.chanRouter.FindRoute,
MissionControl: s.missionControl,
ActiveNetParams: activeNetParams.Params,
Tower: s.controlTower,
MaxTotalTimelock: cfg.MaxOutgoingCltvExpiry,
DefaultFinalCltvDelta: uint16(cfg.Bitcoin.TimeLockDelta),
SubscribeHtlcEvents: s.htlcNotifier.SubscribeHtlcEvents,
}
genInvoiceFeatures := func() *lnwire.FeatureVector {
return s.featureMgr.Get(feature.SetInvoice)
}
var (
subServers []lnrpc.SubServer
subServerPerms []lnrpc.MacaroonPerms
)
err = subServerCgs.PopulateDependencies(
s.cc, networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, activeNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.chanDB, s.sweeper, tower,
s.towerClient, cfg.net.ResolveTCPAddr, genInvoiceFeatures,
)
if err != nil {
return nil, err
}
registeredSubServers := lnrpc.RegisteredSubServers()
for _, subServer := range registeredSubServers {
subServerInstance, macPerms, err := subServer.New(subServerCgs)
if err != nil {
return nil, err
}
subServers = append(subServers, subServerInstance)
subServerPerms = append(subServerPerms, macPerms)
}
permissions := mainRPCServerPermissions()
for _, subServerPerm := range subServerPerms {
for method, ops := range subServerPerm {
if _, ok := permissions[method]; ok {
return nil, fmt.Errorf("detected duplicate "+
"macaroon constraints for path: %v",
method)
}
permissions[method] = ops
}
}
macUnaryInterceptors := []grpc.UnaryServerInterceptor{}
macStrmInterceptors := []grpc.StreamServerInterceptor{}
if macService != nil {
unaryInterceptor := macService.UnaryServerInterceptor(permissions)
macUnaryInterceptors = append(macUnaryInterceptors, unaryInterceptor)
strmInterceptor := macService.StreamServerInterceptor(permissions)
macStrmInterceptors = append(macStrmInterceptors, strmInterceptor)
}
promUnaryInterceptors, promStrmInterceptors := monitoring.GetPromInterceptors()
unaryInterceptors := append(macUnaryInterceptors, promUnaryInterceptors...)
strmInterceptors := append(macStrmInterceptors, promStrmInterceptors...)
unaryInterceptors = append(
unaryInterceptors, errorLogUnaryServerInterceptor(rpcsLog),
)
strmInterceptors = append(
strmInterceptors, errorLogStreamServerInterceptor(rpcsLog),
)
listeners, cleanup, err := getListeners()
if err != nil {
return nil, err
}
if len(unaryInterceptors) != 0 && len(strmInterceptors) != 0 {
chainedUnary := grpc_middleware.WithUnaryServerChain(
unaryInterceptors...,
)
chainedStream := grpc_middleware.WithStreamServerChain(
strmInterceptors...,
)
serverOpts = append(serverOpts, chainedUnary, chainedStream)
}
grpcServer := grpc.NewServer(serverOpts...)
rootRPCServer := &rpcServer{
restDialOpts: restDialOpts,
listeners: listeners,
listenerCleanUp: []func(){cleanup},
restProxyDest: restProxyDest,
subServers: subServers,
tlsCfg: tlsCfg,
grpcServer: grpcServer,
server: s,
routerBackend: routerBackend,
chanPredicate: chanPredicate,
quit: make(chan struct{}, 1),
macService: macService,
selfNode: selfNode.PubKeyBytes,
}
lnrpc.RegisterLightningServer(grpcServer, rootRPCServer)
for _, subServer := range subServers {
err := subServer.RegisterWithRootServer(grpcServer)
if err != nil {
return nil, fmt.Errorf("unable to register "+
"sub-server %v with root: %v",
subServer.Name(), err)
}
}
return rootRPCServer, nil
}
func (r *rpcServer) Start() error {
if atomic.AddInt32(&r.started, 1) != 1 {
return nil
}
for _, subServer := range r.subServers {
rpcsLog.Debugf("Starting sub RPC server: %v", subServer.Name())
if err := subServer.Start(); err != nil {
return err
}
}
for _, lis := range r.listeners {
go func(lis *ListenerWithSignal) {
rpcsLog.Infof("RPC server listening on %s", lis.Addr())
close(lis.Ready)
r.grpcServer.Serve(lis)
}(lis)
}
if cfg.Prometheus.Enabled() {
err := monitoring.ExportPrometheusMetrics(
r.grpcServer, cfg.Prometheus,
)
if err != nil {
return err
}
}
customMarshalerOption := proxy.WithMarshalerOption(
proxy.MIMEWildcard, &proxy.JSONPb{
OrigName: true,
EmitDefaults: true,
},
)
mux := proxy.NewServeMux(customMarshalerOption)
err := lnrpc.RegisterLightningHandlerFromEndpoint(
context.Background(), mux, r.restProxyDest,
r.restDialOpts,
)
if err != nil {
return err
}
for _, restEndpoint := range cfg.RESTListeners {
lis, err := lncfg.TLSListenOnAddress(restEndpoint, r.tlsCfg)
if err != nil {
ltndLog.Errorf(
"gRPC proxy unable to listen on %s",
restEndpoint,
)
return err
}
r.listenerCleanUp = append(r.listenerCleanUp, func() {
lis.Close()
})
go func() {
rpcsLog.Infof("gRPC proxy started at %s", lis.Addr())
http.Serve(lis, mux)
}()
}
return nil
}
func (r *rpcServer) Stop() error {
if atomic.AddInt32(&r.shutdown, 1) != 1 {
return nil
}
rpcsLog.Infof("Stopping RPC Server")
close(r.quit)
for _, subServer := range r.subServers {
rpcsLog.Infof("Stopping %v Sub-RPC Server",
subServer.Name())
if err := subServer.Stop(); err != nil {
rpcsLog.Errorf("unable to stop sub-server %v: %v",
subServer.Name(), err)
continue
}
}
for _, cleanUp := range r.listenerCleanUp {
cleanUp()
}
return nil
}
func addrPairsToOutputs(addrPairs map[string]int64) ([]*wire.TxOut, error) {
outputs := make([]*wire.TxOut, 0, len(addrPairs))
for addr, amt := range addrPairs {
addr, err := btcutil.DecodeAddress(addr, activeNetParams.Params)
if err != nil {
return nil, err
}
pkscript, err := txscript.PayToAddrScript(addr)
if err != nil {
return nil, err
}
outputs = append(outputs, wire.NewTxOut(amt, pkscript))
}
return outputs, nil
}
func (r *rpcServer) sendCoinsOnChain(paymentMap map[string]int64,
feeRate chainfee.SatPerKWeight) (*chainhash.Hash, error) {
outputs, err := addrPairsToOutputs(paymentMap)
if err != nil {
return nil, err
}
tx, err := r.server.cc.wallet.SendOutputs(outputs, feeRate)
if err != nil {
return nil, err
}
txHash := tx.TxHash()
return &txHash, nil
}
func (r *rpcServer) ListUnspent(ctx context.Context,
in *lnrpc.ListUnspentRequest) (*lnrpc.ListUnspentResponse, error) {
minConfs := in.MinConfs
maxConfs := in.MaxConfs
switch {
case minConfs < 0:
return nil, fmt.Errorf("min confirmations must be >= 0")
case minConfs > maxConfs:
return nil, fmt.Errorf("max confirmations must be >= min " +
"confirmations")
}
utxos, err := r.server.cc.wallet.ListUnspentWitness(minConfs, maxConfs)
if err != nil {
return nil, err
}
resp := &lnrpc.ListUnspentResponse{
Utxos: make([]*lnrpc.Utxo, 0, len(utxos)),
}
for _, utxo := range utxos {
var addrType lnrpc.AddressType
switch utxo.AddressType {
case lnwallet.WitnessPubKey:
addrType = lnrpc.AddressType_WITNESS_PUBKEY_HASH
case lnwallet.NestedWitnessPubKey:
addrType = lnrpc.AddressType_NESTED_PUBKEY_HASH
case lnwallet.UnknownAddressType:
rpcsLog.Warnf("[listunspent] utxo with address of "+
"unknown type ignored: %v",
utxo.OutPoint.String())
continue
default:
return nil, fmt.Errorf("invalid utxo address type")
}
outpoint := &lnrpc.OutPoint{
TxidBytes: utxo.OutPoint.Hash[:],
TxidStr: utxo.OutPoint.Hash.String(),
OutputIndex: utxo.OutPoint.Index,
}
utxoResp := lnrpc.Utxo{
AddressType: addrType,
AmountSat: int64(utxo.Value),
PkScript: hex.EncodeToString(utxo.PkScript),
Outpoint: outpoint,
Confirmations: utxo.Confirmations,
}
_, outAddresses, _, err := txscript.ExtractPkScriptAddrs(
utxo.PkScript, activeNetParams.Params,
)
if err != nil {
return nil, err
}
if len(outAddresses) != 1 {
return nil, fmt.Errorf("an output was unexpectedly " +
"multisig")
}
utxoResp.Address = outAddresses[0].String()
resp.Utxos = append(resp.Utxos, &utxoResp)
}
maxStr := ""
if maxConfs != math.MaxInt32 {
maxStr = " max=" + fmt.Sprintf("%d", maxConfs)
}
rpcsLog.Debugf("[listunspent] min=%v%v, generated utxos: %v", minConfs,
maxStr, utxos)
return resp, nil
}
func (r *rpcServer) EstimateFee(ctx context.Context,
in *lnrpc.EstimateFeeRequest) (*lnrpc.EstimateFeeResponse, error) {
outputs, err := addrPairsToOutputs(in.AddrToAmount)
if err != nil {
return nil, err
}
target := in.TargetConf
feePerKw, err := sweep.DetermineFeePerKw(
r.server.cc.feeEstimator, sweep.FeePreference{
ConfTarget: uint32(target),
},
)
if err != nil {
return nil, err
}
var tx *txauthor.AuthoredTx
wallet := r.server.cc.wallet
err = wallet.WithCoinSelectLock(func() error {
tx, err = wallet.CreateSimpleTx(outputs, feePerKw, true)
return err
})
if err != nil {
return nil, err
}
totalOutput := int64(0)
for _, out := range tx.Tx.TxOut {
totalOutput += out.Value
}
totalFee := int64(tx.TotalInput) - totalOutput
resp := &lnrpc.EstimateFeeResponse{
FeeSat: totalFee,
FeerateSatPerByte: int64(feePerKw.FeePerKVByte() / 1000),
}
rpcsLog.Debugf("[estimatefee] fee estimate for conf target %d: %v",
target, resp)
return resp, nil
}
func (r *rpcServer) SendCoins(ctx context.Context,
in *lnrpc.SendCoinsRequest) (*lnrpc.SendCoinsResponse, error) {
satPerKw := chainfee.SatPerKVByte(in.SatPerByte * 1000).FeePerKWeight()
feePerKw, err := sweep.DetermineFeePerKw(
r.server.cc.feeEstimator, sweep.FeePreference{
ConfTarget: uint32(in.TargetConf),
FeeRate: satPerKw,
},
)
if err != nil {
return nil, err
}
rpcsLog.Infof("[sendcoins] addr=%v, amt=%v, sat/kw=%v, sweep_all=%v",
in.Addr, btcutil.Amount(in.Amount), int64(feePerKw),
in.SendAll)
targetAddr, err := btcutil.DecodeAddress(in.Addr, activeNetParams.Params)
if err != nil {
return nil, err
}
if !targetAddr.IsForNet(activeNetParams.Params) {
return nil, fmt.Errorf("address: %v is not valid for this "+
"network: %v", targetAddr.String(),
activeNetParams.Params.Name)
}
decodedAddr, _ := hex.DecodeString(in.Addr)
_, err = btcec.ParsePubKey(decodedAddr, btcec.S256())
if err == nil {
return nil, fmt.Errorf("cannot send coins to pubkeys")
}
var txid *chainhash.Hash
wallet := r.server.cc.wallet
if in.SendAll {
if in.Amount != 0 {
return nil, fmt.Errorf("amount set while SendAll is " +
"active")
}
_, bestHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil {
return nil, err
}
sweepTxPkg, err := sweep.CraftSweepAllTx(
feePerKw, uint32(bestHeight), targetAddr, wallet,
wallet.WalletController, wallet.WalletController,
r.server.cc.feeEstimator, r.server.cc.signer,
)
if err != nil {
return nil, err
}
rpcsLog.Debugf("Sweeping all coins from wallet to addr=%v, "+
"with tx=%v", in.Addr, spew.Sdump(sweepTxPkg.SweepTx))
err = wallet.PublishTransaction(sweepTxPkg.SweepTx)
if err != nil {
sweepTxPkg.CancelSweepAttempt()
return nil, fmt.Errorf("unable to broadcast sweep "+
"transaction: %v", err)
}
sweepTXID := sweepTxPkg.SweepTx.TxHash()
txid = &sweepTXID
} else {
paymentMap := map[string]int64{targetAddr.String(): in.Amount}
err := wallet.WithCoinSelectLock(func() error {
newTXID, err := r.sendCoinsOnChain(paymentMap, feePerKw)
if err != nil {
return err
}
txid = newTXID
return nil
})
if err != nil {
return nil, err
}
}
rpcsLog.Infof("[sendcoins] spend generated txid: %v", txid.String())
return &lnrpc.SendCoinsResponse{Txid: txid.String()}, nil
}
func (r *rpcServer) SendMany(ctx context.Context,
in *lnrpc.SendManyRequest) (*lnrpc.SendManyResponse, error) {
satPerKw := chainfee.SatPerKVByte(in.SatPerByte * 1000).FeePerKWeight()
feePerKw, err := sweep.DetermineFeePerKw(
r.server.cc.feeEstimator, sweep.FeePreference{
ConfTarget: uint32(in.TargetConf),
FeeRate: satPerKw,
},
)
if err != nil {
return nil, err
}
rpcsLog.Infof("[sendmany] outputs=%v, sat/kw=%v",
spew.Sdump(in.AddrToAmount), int64(feePerKw))
var txid *chainhash.Hash
wallet := r.server.cc.wallet
err = wallet.WithCoinSelectLock(func() error {
sendManyTXID, err := r.sendCoinsOnChain(
in.AddrToAmount, feePerKw,
)
if err != nil {
return err
}
txid = sendManyTXID
return nil
})
if err != nil {
return nil, err
}
rpcsLog.Infof("[sendmany] spend generated txid: %v", txid.String())
return &lnrpc.SendManyResponse{Txid: txid.String()}, nil
}
func (r *rpcServer) NewAddress(ctx context.Context,
in *lnrpc.NewAddressRequest) (*lnrpc.NewAddressResponse, error) {
var (
addr btcutil.Address
err error
)
switch in.Type {
case lnrpc.AddressType_WITNESS_PUBKEY_HASH:
addr, err = r.server.cc.wallet.NewAddress(
lnwallet.WitnessPubKey, false,
)
if err != nil {
return nil, err
}
case lnrpc.AddressType_NESTED_PUBKEY_HASH:
addr, err = r.server.cc.wallet.NewAddress(
lnwallet.NestedWitnessPubKey, false,
)
if err != nil {
return nil, err
}
case lnrpc.AddressType_UNUSED_WITNESS_PUBKEY_HASH:
addr, err = r.server.cc.wallet.LastUnusedAddress(
lnwallet.WitnessPubKey,
)
if err != nil {
return nil, err
}
case lnrpc.AddressType_UNUSED_NESTED_PUBKEY_HASH:
addr, err = r.server.cc.wallet.LastUnusedAddress(
lnwallet.NestedWitnessPubKey,
)
if err != nil {
return nil, err
}
}
rpcsLog.Debugf("[newaddress] type=%v addr=%v", in.Type, addr.String())
return &lnrpc.NewAddressResponse{Address: addr.String()}, nil
}
var (
signedMsgPrefix = []byte("Lightning Signed Message:")
)
func (r *rpcServer) SignMessage(ctx context.Context,
in *lnrpc.SignMessageRequest) (*lnrpc.SignMessageResponse, error) {
if in.Msg == nil {
return nil, fmt.Errorf("need a message to sign")
}
in.Msg = append(signedMsgPrefix, in.Msg...)
sigBytes, err := r.server.nodeSigner.SignCompact(in.Msg)
if err != nil {
return nil, err
}
sig := zbase32.EncodeToString(sigBytes)
return &lnrpc.SignMessageResponse{Signature: sig}, nil
}
func (r *rpcServer) VerifyMessage(ctx context.Context,
in *lnrpc.VerifyMessageRequest) (*lnrpc.VerifyMessageResponse, error) {
if in.Msg == nil {
return nil, fmt.Errorf("need a message to verify")
}
sig, err := zbase32.DecodeString(in.Signature)
if err != nil {
return nil, fmt.Errorf("failed to decode signature: %v", err)
}
in.Msg = append(signedMsgPrefix, in.Msg...)
digest := chainhash.DoubleHashB(in.Msg)
pubKey, _, err := btcec.RecoverCompact(btcec.S256(), sig, digest)
if err != nil {
return &lnrpc.VerifyMessageResponse{Valid: false}, nil
}
pubKeyHex := hex.EncodeToString(pubKey.SerializeCompressed())
var pub [33]byte
copy(pub[:], pubKey.SerializeCompressed())
graph := r.server.chanDB.ChannelGraph()
_, active, err := graph.HasLightningNode(pub)
if err != nil {
return nil, fmt.Errorf("failed to query graph: %v", err)
}
return &lnrpc.VerifyMessageResponse{
Valid: active,
Pubkey: pubKeyHex,
}, nil
}
func (r *rpcServer) ConnectPeer(ctx context.Context,
in *lnrpc.ConnectPeerRequest) (*lnrpc.ConnectPeerResponse, error) {
if !r.server.Started() {
return nil, ErrServerNotActive
}
if in.Addr == nil {
return nil, fmt.Errorf("need: lnc pubkeyhash@hostname")
}
pubkeyHex, err := hex.DecodeString(in.Addr.Pubkey)
if err != nil {
return nil, err
}
pubKey, err := btcec.ParsePubKey(pubkeyHex, btcec.S256())
if err != nil {
return nil, err
}
if pubKey.IsEqual(r.server.identityPriv.PubKey()) {
return nil, fmt.Errorf("cannot make connection to self")
}
addr, err := parseAddr(in.Addr.Host)
if err != nil {
return nil, err
}
peerAddr := &lnwire.NetAddress{
IdentityKey: pubKey,
Address: addr,
ChainNet: activeNetParams.Net,
}
rpcsLog.Debugf("[connectpeer] requested connection to %x@%s",
peerAddr.IdentityKey.SerializeCompressed(), peerAddr.Address)
if err := r.server.ConnectToPeer(peerAddr, in.Perm); err != nil {
rpcsLog.Errorf("[connectpeer]: error connecting to peer: %v", err)
return nil, err
}
rpcsLog.Debugf("Connected to peer: %v", peerAddr.String())
return &lnrpc.ConnectPeerResponse{}, nil
}
func (r *rpcServer) DisconnectPeer(ctx context.Context,
in *lnrpc.DisconnectPeerRequest) (*lnrpc.DisconnectPeerResponse, error) {
rpcsLog.Debugf("[disconnectpeer] from peer(%s)", in.PubKey)
if !r.server.Started() {
return nil, ErrServerNotActive
}
pubKeyBytes, err := hex.DecodeString(in.PubKey)
if err != nil {
return nil, fmt.Errorf("unable to decode pubkey bytes: %v", err)
}
peerPubKey, err := btcec.ParsePubKey(pubKeyBytes, btcec.S256())
if err != nil {
return nil, fmt.Errorf("unable to parse pubkey: %v", err)
}
nodeChannels, err := r.server.chanDB.FetchOpenChannels(peerPubKey)
if err != nil {
return nil, fmt.Errorf("unable to fetch channels for peer: %v", err)
}
if len(nodeChannels) > 0 && !cfg.UnsafeDisconnect {
return nil, fmt.Errorf("cannot disconnect from peer(%x), "+
"all active channels with the peer need to be closed "+
"first", pubKeyBytes)
}
if err := r.server.DisconnectPeer(peerPubKey); err != nil {
return nil, fmt.Errorf("unable to disconnect peer: %v", err)
}
return &lnrpc.DisconnectPeerResponse{}, nil
}
func extractOpenChannelMinConfs(in *lnrpc.OpenChannelRequest) (int32, error) {
switch {
case in.MinConfs < 0:
return 0, errors.New("minimum number of confirmations must " +
"be a non-negative number")
case in.MinConfs == 0 && !in.SpendUnconfirmed:
return 1, nil
case in.MinConfs > 0 && in.SpendUnconfirmed:
return 0, errors.New("SpendUnconfirmed set to true with " +
"MinConfs > 0")
case in.SpendUnconfirmed:
return 0, nil
default:
return in.MinConfs, nil
}
}
func newFundingShimAssembler(chanPointShim *lnrpc.ChanPointShim, initiator bool,
keyRing keychain.KeyRing) (chanfunding.Assembler, error) {
switch {
case chanPointShim.RemoteKey == nil:
return nil, fmt.Errorf("remote key not set")
case chanPointShim.LocalKey == nil:
return nil, fmt.Errorf("local key desc not set")
case chanPointShim.LocalKey.RawKeyBytes == nil:
return nil, fmt.Errorf("local raw key bytes not set")
case chanPointShim.LocalKey.KeyLoc == nil:
return nil, fmt.Errorf("local key loc not set")
case chanPointShim.ChanPoint == nil:
return nil, fmt.Errorf("chan point not set")
case len(chanPointShim.PendingChanId) != 32:
return nil, fmt.Errorf("pending chan ID not set")
}
index := chanPointShim.ChanPoint.OutputIndex
txid, err := GetChanPointFundingTxid(chanPointShim.ChanPoint)
if err != nil {
return nil, err
}
chanPoint := wire.NewOutPoint(txid, index)
remoteKey, err := btcec.ParsePubKey(
chanPointShim.RemoteKey, btcec.S256(),
)
if err != nil {
return nil, err
}
shimKeyDesc := chanPointShim.LocalKey
localKey, err := btcec.ParsePubKey(
shimKeyDesc.RawKeyBytes, btcec.S256(),
)
if err != nil {
return nil, err
}
localKeyDesc := keychain.KeyDescriptor{
PubKey: localKey,
KeyLocator: keychain.KeyLocator{
Family: keychain.KeyFamily(
shimKeyDesc.KeyLoc.KeyFamily,
),
Index: uint32(shimKeyDesc.KeyLoc.KeyIndex),
},
}
derivedKey, err := keyRing.DeriveKey(localKeyDesc.KeyLocator)
if err != nil {
return nil, err
}
if !derivedKey.PubKey.IsEqual(localKey) {
return nil, fmt.Errorf("KeyLocator does not match attached " +
"raw pubkey")
}
return chanfunding.NewCannedAssembler(
chanPointShim.ThawHeight, *chanPoint,
btcutil.Amount(chanPointShim.Amt), &localKeyDesc,
remoteKey, initiator,
), nil
}
func newPsbtAssembler(req *lnrpc.OpenChannelRequest, normalizedMinConfs int32,
psbtShim *lnrpc.PsbtShim, netParams *chaincfg.Params) (
chanfunding.Assembler, error) {
var (
packet *psbt.Packet
err error
)
if len(psbtShim.PendingChanId) != 32 {
return nil, fmt.Errorf("pending chan ID not set")
}
if normalizedMinConfs != 1 {
return nil, fmt.Errorf("setting non-default values for " +
"minimum confirmation is not supported for PSBT " +
"funding")
}
if req.SatPerByte != 0 || req.TargetConf != 0 {
return nil, fmt.Errorf("specifying fee estimation parameters " +
"is not supported for PSBT funding")
}
if len(psbtShim.BasePsbt) > 0 {
packet, err = psbt.NewFromRawBytes(
bytes.NewReader(psbtShim.BasePsbt), false,
)
if err != nil {
return nil, fmt.Errorf("error parsing base PSBT: %v",
err)
}
}
return chanfunding.NewPsbtAssembler(
btcutil.Amount(req.LocalFundingAmount), packet, netParams,
), nil
}
func (r *rpcServer) canOpenChannel() error {
if !r.server.Started() {
return ErrServerNotActive
}
isSynced, _, err := r.server.cc.wallet.IsSynced()
if err != nil {
return err
}
if !isSynced {
return errors.New("channels cannot be created before the " +
"wallet is fully synced")
}
return nil
}
func (r *rpcServer) parseOpenChannelReq(in *lnrpc.OpenChannelRequest,
isSync bool) (*openChanReq, error) {
rpcsLog.Debugf("[openchannel] request to NodeKey(%x) "+
"allocation(us=%v, them=%v)", in.NodePubkey,
in.LocalFundingAmount, in.PushSat)
localFundingAmt := btcutil.Amount(in.LocalFundingAmount)
remoteInitialBalance := btcutil.Amount(in.PushSat)
minHtlcIn := lnwire.MilliSatoshi(in.MinHtlcMsat)
remoteCsvDelay := uint16(in.RemoteCsvDelay)
if remoteInitialBalance >= localFundingAmt {
return nil, fmt.Errorf("amount pushed to remote peer for " +
"initial state must be below the local funding amount")
}
if localFundingAmt > MaxFundingAmount {
return nil, fmt.Errorf("funding amount is too large, the max "+
"channel size is: %v", MaxFundingAmount)
}
if localFundingAmt < minChanFundingSize {
return nil, fmt.Errorf("channel is too small, the minimum "+
"channel size is: %v SAT", int64(minChanFundingSize))
}
minConfs, err := extractOpenChannelMinConfs(in)
if err != nil {
return nil, err
}
var nodePubKey *btcec.PublicKey
switch {
case len(in.NodePubkey) > 0:
nodePubKey, err = btcec.ParsePubKey(in.NodePubkey, btcec.S256())
if err != nil {
return nil, err
}
case isSync:
keyBytes, err := hex.DecodeString(in.NodePubkeyString)
if err != nil {
return nil, err
}
nodePubKey, err = btcec.ParsePubKey(keyBytes, btcec.S256())
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("NodePubkey is not set")
}
if nodePubKey.IsEqual(r.server.identityPriv.PubKey()) {
return nil, fmt.Errorf("cannot open channel to self")
}
satPerKw := chainfee.SatPerKVByte(in.SatPerByte * 1000).FeePerKWeight()
feeRate, err := sweep.DetermineFeePerKw(
r.server.cc.feeEstimator, sweep.FeePreference{
ConfTarget: uint32(in.TargetConf),
FeeRate: satPerKw,
},
)
if err != nil {
return nil, err
}
rpcsLog.Debugf("[openchannel]: using fee of %v sat/kw for funding tx",
int64(feeRate))
script, err := parseUpfrontShutdownAddress(in.CloseAddress)
if err != nil {
return nil, fmt.Errorf("error parsing upfront shutdown: %v",
err)
}
return &openChanReq{
targetPubkey: nodePubKey,
chainHash: *activeNetParams.GenesisHash,
localFundingAmt: localFundingAmt,
pushAmt: lnwire.NewMSatFromSatoshis(remoteInitialBalance),
minHtlcIn: minHtlcIn,
fundingFeePerKw: feeRate,
private: in.Private,
remoteCsvDelay: remoteCsvDelay,
minConfs: minConfs,
shutdownScript: script,
}, nil
}
func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
updateStream lnrpc.Lightning_OpenChannelServer) error {
if err := r.canOpenChannel(); err != nil {
return err
}
req, err := r.parseOpenChannelReq(in, false)
if err != nil {
return err
}
if in.FundingShim != nil {
switch {
case in.FundingShim.GetChanPointShim() != nil:
chanPointShim := in.FundingShim.GetChanPointShim()
copy(req.pendingChanID[:], chanPointShim.PendingChanId)
req.chanFunder, err = newFundingShimAssembler(
chanPointShim, true, r.server.cc.keyRing,
)
if err != nil {
return err
}
case in.FundingShim.GetPsbtShim() != nil:
psbtShim := in.FundingShim.GetPsbtShim()
copy(req.pendingChanID[:], psbtShim.PendingChanId)
req.chanFunder, err = newPsbtAssembler(
in, req.minConfs, psbtShim,
&r.server.cc.wallet.Cfg.NetParams,
)
if err != nil {
return err
}
}
}
updateChan, errChan := r.server.OpenChannel(req)
var outpoint wire.OutPoint
out:
for {
select {
case err := <-errChan:
rpcsLog.Errorf("unable to open channel to NodeKey(%x): %v",
req.targetPubkey.SerializeCompressed(), err)
return err
case fundingUpdate := <-updateChan:
rpcsLog.Tracef("[openchannel] sending update: %v",
fundingUpdate)
if err := updateStream.Send(fundingUpdate); err != nil {
return err
}
switch update := fundingUpdate.Update.(type) {
case *lnrpc.OpenStatusUpdate_ChanOpen:
chanPoint := update.ChanOpen.ChannelPoint
txid, err := GetChanPointFundingTxid(chanPoint)
if err != nil {
return err
}
outpoint = wire.OutPoint{
Hash: *txid,
Index: chanPoint.OutputIndex,
}
break out
}
case <-r.quit:
return nil
}
}
rpcsLog.Tracef("[openchannel] success NodeKey(%x), ChannelPoint(%v)",
req.targetPubkey.SerializeCompressed(), outpoint)
return nil
}
func (r *rpcServer) OpenChannelSync(ctx context.Context,
in *lnrpc.OpenChannelRequest) (*lnrpc.ChannelPoint, error) {
if err := r.canOpenChannel(); err != nil {
return nil, err
}
req, err := r.parseOpenChannelReq(in, true)
if err != nil {
return nil, err
}
updateChan, errChan := r.server.OpenChannel(req)
select {
case err := <-errChan:
rpcsLog.Errorf("unable to open channel to NodeKey(%x): %v",
req.targetPubkey.SerializeCompressed(), err)
return nil, err
case fundingUpdate := <-updateChan:
rpcsLog.Tracef("[openchannel] sending update: %v",
fundingUpdate)
openUpdate := fundingUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanPending)
chanUpdate := openUpdate.ChanPending
return &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: chanUpdate.Txid,
},
OutputIndex: chanUpdate.OutputIndex,
}, nil
case <-r.quit:
return nil, nil
}
}
func parseUpfrontShutdownAddress(address string) (lnwire.DeliveryAddress, error) {
if len(address) == 0 {
return nil, nil
}
addr, err := btcutil.DecodeAddress(
address, activeNetParams.Params,
)
if err != nil {
return nil, fmt.Errorf("invalid address: %v", err)
}
return txscript.PayToAddrScript(addr)
}
func GetChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) (*chainhash.Hash, error) {
var txid []byte
switch chanPoint.GetFundingTxid().(type) {
case *lnrpc.ChannelPoint_FundingTxidBytes:
txid = chanPoint.GetFundingTxidBytes()
case *lnrpc.ChannelPoint_FundingTxidStr:
s := chanPoint.GetFundingTxidStr()
h, err := chainhash.NewHashFromStr(s)
if err != nil {
return nil, err
}
txid = h[:]
}
return chainhash.NewHash(txid)
}
func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
updateStream lnrpc.Lightning_CloseChannelServer) error {
if !r.server.Started() {
return ErrServerNotActive
}
if in.GetChannelPoint() == nil {
return fmt.Errorf("must specify channel point in close channel")
}
if in.Force && (in.SatPerByte != 0 || in.TargetConf != 0) {
return fmt.Errorf("force closing a channel uses a pre-defined fee")
}
force := in.Force
index := in.ChannelPoint.OutputIndex
txid, err := GetChanPointFundingTxid(in.GetChannelPoint())
if err != nil {
rpcsLog.Errorf("[closechannel] unable to get funding txid: %v", err)
return err
}
chanPoint := wire.NewOutPoint(txid, index)
rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v), force=%v",
chanPoint, force)
var (
updateChan chan interface{}
errChan chan error
)
channel, err := r.server.chanDB.FetchChannel(*chanPoint)
if err != nil {
return err
}
_, bestHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil {
return err
}
if force {
remotePub := channel.IdentityPub
if peer, err := r.server.FindPeer(remotePub); err == nil {
peer.WipeChannel(&channel.FundingOutpoint)
} else {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
r.server.htlcSwitch.RemoveLink(chanID)
}
chainArbitrator := r.server.chainArb
closingTx, err := chainArbitrator.ForceCloseContract(
*chanPoint,
)
if err != nil {
rpcsLog.Errorf("unable to force close transaction: %v", err)
return err
}
closingTxid := closingTx.TxHash()
updateChan = make(chan interface{}, 2)
updateChan <- &pendingUpdate{
Txid: closingTxid[:],
}
errChan = make(chan error, 1)
notifier := r.server.cc.chainNotifier
go waitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint,
&closingTxid, closingTx.TxOut[0].PkScript, func() {
updateChan <- &channelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
}
})
} else {
if channel.ChanType.IsFrozen() && channel.IsInitiator &&
uint32(bestHeight) < channel.ThawHeight {
return fmt.Errorf("cannot co-op close frozen channel "+
"as initiator until height=%v, "+
"(current_height=%v)", channel.ThawHeight,
bestHeight)
}
channelID := lnwire.NewChanIDFromOutPoint(chanPoint)
if _, err := r.server.htlcSwitch.GetLink(channelID); err != nil {
rpcsLog.Debugf("Trying to non-force close offline channel with "+
"chan_point=%v", chanPoint)
return fmt.Errorf("unable to gracefully close channel while peer "+
"is offline (try force closing it instead): %v", err)
}
satPerKw := chainfee.SatPerKVByte(
in.SatPerByte * 1000,
).FeePerKWeight()
feeRate, err := sweep.DetermineFeePerKw(
r.server.cc.feeEstimator, sweep.FeePreference{
ConfTarget: uint32(in.TargetConf),
FeeRate: satPerKw,
},
)
if err != nil {
return err
}
rpcsLog.Debugf("Target sat/kw for closing transaction: %v",
int64(feeRate))
if len(channel.ActiveHtlcs()) != 0 {
return fmt.Errorf("cannot co-op close channel " +
"with active htlcs")
}
var deliveryScript lnwire.DeliveryAddress
if len(in.DeliveryAddress) > 0 {
addr, err := btcutil.DecodeAddress(
in.DeliveryAddress, activeNetParams.Params,
)
if err != nil {
return fmt.Errorf("invalid delivery address: %v", err)
}
deliveryScript, err = txscript.PayToAddrScript(addr)
if err != nil {
return err
}
}
updateChan, errChan = r.server.htlcSwitch.CloseLink(
chanPoint, htlcswitch.CloseRegular, feeRate, deliveryScript,
)
}
out:
for {
select {
case err := <-errChan:
rpcsLog.Errorf("[closechannel] unable to close "+
"ChannelPoint(%v): %v", chanPoint, err)
return err
case closingUpdate := <-updateChan:
rpcClosingUpdate, err := createRPCCloseUpdate(
closingUpdate,
)
if err != nil {
return err
}
rpcsLog.Tracef("[closechannel] sending update: %v",
rpcClosingUpdate)
if err := updateStream.Send(rpcClosingUpdate); err != nil {
return err
}
switch closeUpdate := closingUpdate.(type) {
case *channelCloseUpdate:
h, _ := chainhash.NewHash(closeUpdate.ClosingTxid)
rpcsLog.Infof("[closechannel] close completed: "+
"txid(%v)", h)
break out
}
case <-r.quit:
return nil
}
}
return nil
}
func createRPCCloseUpdate(update interface{}) (
*lnrpc.CloseStatusUpdate, error) {
switch u := update.(type) {
case *channelCloseUpdate:
return &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: u.ClosingTxid,
},
},
}, nil
case *pendingUpdate:
return &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{
Txid: u.Txid,
OutputIndex: u.OutputIndex,
},
},
}, nil
}
return nil, errors.New("unknown close status update")
}
func abandonChanFromGraph(chanGraph *channeldb.ChannelGraph,
chanPoint *wire.OutPoint) error {
chanID, err := chanGraph.ChannelID(chanPoint)
switch {
case err == channeldb.ErrEdgeNotFound:
return nil
case err != nil:
return err
}
return chanGraph.DeleteChannelEdges(chanID)
}
func (r *rpcServer) AbandonChannel(ctx context.Context,
in *lnrpc.AbandonChannelRequest) (*lnrpc.AbandonChannelResponse, error) {
if !build.IsDevBuild() {
return nil, fmt.Errorf("AbandonChannel RPC call only " +
"available in dev builds")
}
txid, err := GetChanPointFundingTxid(in.GetChannelPoint())
if err != nil {
return nil, err
}
index := in.ChannelPoint.OutputIndex
chanPoint := wire.NewOutPoint(txid, index)
_, bestHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil {
return nil, err
}
dbChan, err := r.server.chanDB.FetchChannel(*chanPoint)
switch {
case err == channeldb.ErrChannelNotFound:
break
case err == nil:
if err := dbChan.MarkBorked(); err != nil {
return nil, err
}
remotePub := dbChan.IdentityPub
if peer, err := r.server.FindPeer(remotePub); err == nil {
peer.WipeChannel(chanPoint)
}
default:
return nil, err
}
err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight))
if err != nil {
return nil, err
}
err = abandonChanFromGraph(
r.server.chanDB.ChannelGraph(), chanPoint,
)
if err != nil {
return nil, err
}
err = r.server.chainArb.ResolveContract(*chanPoint)
if err != nil {
return nil, err
}
err = r.server.utxoNursery.cfg.Store.RemoveChannel(chanPoint)
if err != nil && err != ErrContractNotFound {
return nil, err
}
r.server.channelNotifier.NotifyClosedChannelEvent(*chanPoint)
return &lnrpc.AbandonChannelResponse{}, nil
}
func (r *rpcServer) GetInfo(ctx context.Context,
in *lnrpc.GetInfoRequest) (*lnrpc.GetInfoResponse, error) {
serverPeers := r.server.Peers()
openChannels, err := r.server.chanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
var activeChannels uint32
for _, channel := range openChannels {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
if r.server.htlcSwitch.HasActiveLink(chanID) {
activeChannels++
}
}
inactiveChannels := uint32(len(openChannels)) - activeChannels
pendingChannels, err := r.server.chanDB.FetchPendingChannels()
if err != nil {
return nil, fmt.Errorf("unable to get retrieve pending "+
"channels: %v", err)
}
nPendingChannels := uint32(len(pendingChannels))
idPub := r.server.identityPriv.PubKey().SerializeCompressed()
encodedIDPub := hex.EncodeToString(idPub)
bestHash, bestHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil {
return nil, fmt.Errorf("unable to get best block info: %v", err)
}
isSynced, bestHeaderTimestamp, err := r.server.cc.wallet.IsSynced()
if err != nil {
return nil, fmt.Errorf("unable to sync PoV of the wallet "+
"with current best block in the main chain: %v", err)
}
network := normalizeNetwork(activeNetParams.Name)
activeChains := make([]*lnrpc.Chain, registeredChains.NumActiveChains())
for i, chain := range registeredChains.ActiveChains() {
activeChains[i] = &lnrpc.Chain{
Chain: chain.String(),
Network: network,
}
}
nodeAnn, err := r.server.genNodeAnnouncement(false)
if err != nil {
return nil, fmt.Errorf("unable to retrieve current fully signed "+
"node announcement: %v", err)
}
addrs := nodeAnn.Addresses
uris := make([]string, len(addrs))
for i, addr := range addrs {
uris[i] = fmt.Sprintf("%s@%s", encodedIDPub, addr.String())
}
isGraphSynced := r.server.authGossiper.SyncManager().IsGraphSynced()
features := make(map[uint32]*lnrpc.Feature)
sets := r.server.featureMgr.ListSets()
for _, set := range sets {
featureVector := r.server.featureMgr.Get(set)
rpcFeatures := invoicesrpc.CreateRPCFeatures(featureVector)
for bit, feature := range rpcFeatures {
features[bit] = feature
}
}
return &lnrpc.GetInfoResponse{
IdentityPubkey: encodedIDPub,
NumPendingChannels: nPendingChannels,
NumActiveChannels: activeChannels,
NumInactiveChannels: inactiveChannels,
NumPeers: uint32(len(serverPeers)),
BlockHeight: uint32(bestHeight),
BlockHash: bestHash.String(),
SyncedToChain: isSynced,
Testnet: isTestnet(&activeNetParams),
Chains: activeChains,
Uris: uris,
Alias: nodeAnn.Alias.String(),
Color: routing.EncodeHexColor(nodeAnn.RGBColor),
BestHeaderTimestamp: int64(bestHeaderTimestamp),
Version: build.Version() + " commit=" + build.Commit,
CommitHash: build.CommitHash,
SyncedToGraph: isGraphSynced,
Features: features,
}, nil
}
func (r *rpcServer) ListPeers(ctx context.Context,
in *lnrpc.ListPeersRequest) (*lnrpc.ListPeersResponse, error) {
rpcsLog.Tracef("[listpeers] request")
serverPeers := r.server.Peers()
resp := &lnrpc.ListPeersResponse{
Peers: make([]*lnrpc.Peer, 0, len(serverPeers)),
}
for _, serverPeer := range serverPeers {
var (
satSent int64
satRecv int64
)
chans := serverPeer.ChannelSnapshots()
for _, c := range chans {
satSent += int64(c.TotalMSatSent.ToSatoshis())
satRecv += int64(c.TotalMSatReceived.ToSatoshis())
}
nodePub := serverPeer.PubKey()
syncer, ok := r.server.authGossiper.SyncManager().GossipSyncer(
nodePub,
)
var lnrpcSyncType lnrpc.Peer_SyncType
if !ok {
rpcsLog.Warnf("Gossip syncer for peer=%x not found",
nodePub)
lnrpcSyncType = lnrpc.Peer_UNKNOWN_SYNC
} else {
syncType := syncer.SyncType()
switch syncType {
case discovery.ActiveSync:
lnrpcSyncType = lnrpc.Peer_ACTIVE_SYNC
case discovery.PassiveSync:
lnrpcSyncType = lnrpc.Peer_PASSIVE_SYNC
default:
return nil, fmt.Errorf("unhandled sync type %v",
syncType)
}
}
features := invoicesrpc.CreateRPCFeatures(
serverPeer.RemoteFeatures(),
)
peer := &lnrpc.Peer{
PubKey: hex.EncodeToString(nodePub[:]),
Address: serverPeer.conn.RemoteAddr().String(),
Inbound: serverPeer.inbound,
BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived),
BytesSent: atomic.LoadUint64(&serverPeer.bytesSent),
SatSent: satSent,
SatRecv: satRecv,
PingTime: serverPeer.PingTime(),
SyncType: lnrpcSyncType,
Features: features,
}
var peerErrors []interface{}
if in.LatestError {
latestErr := serverPeer.errorBuffer.Latest()
if latestErr != nil {
peerErrors = []interface{}{latestErr}
}
} else {
peerErrors = serverPeer.errorBuffer.List()
}
for _, error := range peerErrors {
tsError := error.(*timestampedError)
rpcErr := &lnrpc.TimestampedError{
Timestamp: uint64(tsError.timestamp.Unix()),
Error: tsError.error.Error(),
}
peer.Errors = append(peer.Errors, rpcErr)
}
resp.Peers = append(resp.Peers, peer)
}
rpcsLog.Debugf("[listpeers] yielded %v peers", serverPeers)
return resp, nil
}
func (r *rpcServer) SubscribePeerEvents(req *lnrpc.PeerEventSubscription,
eventStream lnrpc.Lightning_SubscribePeerEventsServer) error {
peerEventSub, err := r.server.peerNotifier.SubscribePeerEvents()
if err != nil {
return err
}
defer peerEventSub.Cancel()
for {
select {
case e := <-peerEventSub.Updates():
var event *lnrpc.PeerEvent
switch peerEvent := e.(type) {
case peernotifier.PeerOfflineEvent:
event = &lnrpc.PeerEvent{
PubKey: hex.EncodeToString(peerEvent.PubKey[:]),
Type: lnrpc.PeerEvent_PEER_OFFLINE,
}
case peernotifier.PeerOnlineEvent:
event = &lnrpc.PeerEvent{
PubKey: hex.EncodeToString(peerEvent.PubKey[:]),
Type: lnrpc.PeerEvent_PEER_ONLINE,
}
default:
return fmt.Errorf("unexpected peer event: %v", event)
}
if err := eventStream.Send(event); err != nil {
return err
}
case <-r.quit:
return nil
}
}
}
func (r *rpcServer) WalletBalance(ctx context.Context,
in *lnrpc.WalletBalanceRequest) (*lnrpc.WalletBalanceResponse, error) {
totalBal, err := r.server.cc.wallet.ConfirmedBalance(0)
if err != nil {
return nil, err
}
confirmedBal, err := r.server.cc.wallet.ConfirmedBalance(1)
if err != nil {
return nil, err
}
unconfirmedBal := totalBal - confirmedBal
rpcsLog.Debugf("[walletbalance] Total balance=%v (confirmed=%v, "+
"unconfirmed=%v)", totalBal, confirmedBal, unconfirmedBal)
return &lnrpc.WalletBalanceResponse{
TotalBalance: int64(totalBal),
ConfirmedBalance: int64(confirmedBal),
UnconfirmedBalance: int64(unconfirmedBal),
}, nil
}
func (r *rpcServer) ChannelBalance(ctx context.Context,
in *lnrpc.ChannelBalanceRequest) (*lnrpc.ChannelBalanceResponse, error) {
openChannels, err := r.server.chanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
var balance btcutil.Amount
for _, channel := range openChannels {
balance += channel.LocalCommitment.LocalBalance.ToSatoshis()
}
pendingChannels, err := r.server.chanDB.FetchPendingChannels()
if err != nil {
return nil, err
}
var pendingOpenBalance btcutil.Amount
for _, channel := range pendingChannels {
pendingOpenBalance += channel.LocalCommitment.LocalBalance.ToSatoshis()
}
rpcsLog.Debugf("[channelbalance] balance=%v pending-open=%v",
balance, pendingOpenBalance)
return &lnrpc.ChannelBalanceResponse{
Balance: int64(balance),
PendingOpenBalance: int64(pendingOpenBalance),
}, nil
}
func (r *rpcServer) PendingChannels(ctx context.Context,
in *lnrpc.PendingChannelsRequest) (*lnrpc.PendingChannelsResponse, error) {
rpcsLog.Debugf("[pendingchannels]")
resp := &lnrpc.PendingChannelsResponse{}
rpcInitiator := func(isInitiator bool) lnrpc.Initiator {
if isInitiator {
return lnrpc.Initiator_INITIATOR_LOCAL
}
return lnrpc.Initiator_INITIATOR_REMOTE
}
pendingOpenChannels, err := r.server.chanDB.FetchPendingChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch pending channels: %v", err)
return nil, err
}
resp.PendingOpenChannels = make([]*lnrpc.PendingChannelsResponse_PendingOpenChannel,
len(pendingOpenChannels))
for i, pendingChan := range pendingOpenChannels {
pub := pendingChan.IdentityPub.SerializeCompressed()
localCommitment := pendingChan.LocalCommitment
utx := btcutil.NewTx(localCommitment.CommitTx)
commitBaseWeight := blockchain.GetTransactionWeight(utx)
commitWeight := commitBaseWeight + input.WitnessCommitmentTxWeight
resp.PendingOpenChannels[i] = &lnrpc.PendingChannelsResponse_PendingOpenChannel{
Channel: &lnrpc.PendingChannelsResponse_PendingChannel{
RemoteNodePub: hex.EncodeToString(pub),
ChannelPoint: pendingChan.FundingOutpoint.String(),
Capacity: int64(pendingChan.Capacity),
LocalBalance: int64(localCommitment.LocalBalance.ToSatoshis()),
RemoteBalance: int64(localCommitment.RemoteBalance.ToSatoshis()),
LocalChanReserveSat: int64(pendingChan.LocalChanCfg.ChanReserve),
RemoteChanReserveSat: int64(pendingChan.RemoteChanCfg.ChanReserve),
Initiator: rpcInitiator(pendingChan.IsInitiator),
CommitmentType: rpcCommitmentType(pendingChan.ChanType),
},
CommitWeight: commitWeight,
CommitFee: int64(localCommitment.CommitFee),
FeePerKw: int64(localCommitment.FeePerKw),
}
}
_, currentHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil {
return nil, err
}
pendingCloseChannels, err := r.server.chanDB.FetchClosedChannels(true)
if err != nil {
rpcsLog.Errorf("unable to fetch closed channels: %v", err)
return nil, err
}
for _, pendingClose := range pendingCloseChannels {
pub := pendingClose.RemotePub.SerializeCompressed()
chanPoint := pendingClose.ChanPoint
channel := &lnrpc.PendingChannelsResponse_PendingChannel{
RemoteNodePub: hex.EncodeToString(pub),
ChannelPoint: chanPoint.String(),
Capacity: int64(pendingClose.Capacity),
LocalBalance: int64(pendingClose.SettledBalance),
CommitmentType: lnrpc.CommitmentType_UNKNOWN_COMMITMENT_TYPE,
Initiator: lnrpc.Initiator_INITIATOR_UNKNOWN,
}
historical, err := r.server.chanDB.FetchHistoricalChannel(
&pendingClose.ChanPoint,
)
switch err {
case channeldb.ErrNoHistoricalBucket:
case channeldb.ErrChannelNotFound:
case nil:
channel.Initiator = rpcInitiator(historical.IsInitiator)
channel.CommitmentType = rpcCommitmentType(
historical.ChanType,
)
default:
return nil, err
}
closeTXID := pendingClose.ClosingTXID.String()
switch pendingClose.CloseType {
case channeldb.CooperativeClose:
rpcsLog.Warn("channel %v cooperatively closed and "+
"in pending close state",
pendingClose.ChanPoint)
case channeldb.LocalForceClose, channeldb.RemoteForceClose:
forceClose := &lnrpc.PendingChannelsResponse_ForceClosedChannel{
Channel: channel,
ClosingTxid: closeTXID,
}
err := r.nurseryPopulateForceCloseResp(
&chanPoint, currentHeight, forceClose,
)
if err != nil {
return nil, err
}
err = r.arbitratorPopulateForceCloseResp(
&chanPoint, currentHeight, forceClose,
)
if err != nil {
return nil, err
}
resp.TotalLimboBalance += int64(forceClose.LimboBalance)
resp.PendingForceClosingChannels = append(
resp.PendingForceClosingChannels,
forceClose,
)
}
}
waitingCloseChans, err := r.server.chanDB.FetchWaitingCloseChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch channels waiting close: %v",
err)
return nil, err
}
for _, waitingClose := range waitingCloseChans {
pub := waitingClose.IdentityPub.SerializeCompressed()
chanPoint := waitingClose.FundingOutpoint
var commitments lnrpc.PendingChannelsResponse_Commitments
if waitingClose.LocalCommitment.CommitTx != nil {
commitments.LocalTxid =
waitingClose.LocalCommitment.CommitTx.TxHash().
String()
commitments.LocalCommitFeeSat = uint64(
waitingClose.LocalCommitment.CommitFee,
)
}
if waitingClose.RemoteCommitment.CommitTx != nil {
commitments.RemoteTxid =
waitingClose.RemoteCommitment.CommitTx.TxHash().
String()
commitments.RemoteCommitFeeSat = uint64(
waitingClose.RemoteCommitment.CommitFee,
)
}
remoteCommitDiff, err := waitingClose.RemoteCommitChainTip()
switch {
case err == channeldb.ErrNoPendingCommit:
case err != nil:
return nil, err
default:
hash := remoteCommitDiff.Commitment.CommitTx.TxHash()
commitments.RemotePendingTxid = hash.String()
commitments.RemoteCommitFeeSat = uint64(
remoteCommitDiff.Commitment.CommitFee,
)
}
channel := &lnrpc.PendingChannelsResponse_PendingChannel{
RemoteNodePub: hex.EncodeToString(pub),
ChannelPoint: chanPoint.String(),
Capacity: int64(waitingClose.Capacity),
LocalBalance: int64(waitingClose.LocalCommitment.LocalBalance.ToSatoshis()),
RemoteBalance: int64(waitingClose.LocalCommitment.RemoteBalance.ToSatoshis()),
LocalChanReserveSat: int64(waitingClose.LocalChanCfg.ChanReserve),
RemoteChanReserveSat: int64(waitingClose.RemoteChanCfg.ChanReserve),
Initiator: rpcInitiator(waitingClose.IsInitiator),
CommitmentType: rpcCommitmentType(waitingClose.ChanType),
}
waitingCloseResp := &lnrpc.PendingChannelsResponse_WaitingCloseChannel{
Channel: channel,
LimboBalance: channel.LocalBalance,
Commitments: &commitments,
}
resp.WaitingCloseChannels = append(
resp.WaitingCloseChannels, waitingCloseResp,
)
resp.TotalLimboBalance += channel.LocalBalance
}
return resp, nil
}
func (r *rpcServer) arbitratorPopulateForceCloseResp(chanPoint *wire.OutPoint,
currentHeight int32,
forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel) error {
arbitrator, err := r.server.chainArb.GetChannelArbitrator(*chanPoint)
if err != nil {
return err
}
reports := arbitrator.Report()
for _, report := range reports {
switch report.Type {
case contractcourt.ReportOutputUnencumbered:
forceClose.MaturityHeight = report.MaturityHeight
if forceClose.MaturityHeight != 0 {
forceClose.BlocksTilMaturity =
int32(forceClose.MaturityHeight) -
currentHeight
}
case contractcourt.ReportOutputIncomingHtlc,
contractcourt.ReportOutputOutgoingHtlc:
if report.LimboBalance == 0 {
break
}
incoming := report.Type == contractcourt.ReportOutputIncomingHtlc
htlc := &lnrpc.PendingHTLC{
Incoming: incoming,
Amount: int64(report.Amount),
Outpoint: report.Outpoint.String(),
MaturityHeight: report.MaturityHeight,
Stage: report.Stage,
}
if htlc.MaturityHeight != 0 {
htlc.BlocksTilMaturity =
int32(htlc.MaturityHeight) - currentHeight
}
forceClose.PendingHtlcs = append(forceClose.PendingHtlcs, htlc)
case contractcourt.ReportOutputAnchor:
switch {
case report.RecoveredBalance != 0:
forceClose.Anchor = lnrpc.PendingChannelsResponse_ForceClosedChannel_RECOVERED
case report.LimboBalance != 0:
forceClose.Anchor = lnrpc.PendingChannelsResponse_ForceClosedChannel_LIMBO
default:
forceClose.Anchor = lnrpc.PendingChannelsResponse_ForceClosedChannel_LOST
}
default:
return fmt.Errorf("unknown report output type: %v",
report.Type)
}
forceClose.LimboBalance += int64(report.LimboBalance)
forceClose.RecoveredBalance += int64(report.RecoveredBalance)
}
return nil
}
func (r *rpcServer) nurseryPopulateForceCloseResp(chanPoint *wire.OutPoint,
currentHeight int32,
forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel) error {
nurseryInfo, err := r.server.utxoNursery.NurseryReport(chanPoint)
if err == ErrContractNotFound {
return nil
}
if err != nil {
return fmt.Errorf("unable to obtain "+
"nursery report for ChannelPoint(%v): %v",
chanPoint, err)
}
forceClose.LimboBalance = int64(nurseryInfo.limboBalance)
forceClose.RecoveredBalance = int64(nurseryInfo.recoveredBalance)
for _, htlcReport := range nurseryInfo.htlcs {
htlc := &lnrpc.PendingHTLC{
Incoming: false,
Amount: int64(htlcReport.amount),
Outpoint: htlcReport.outpoint.String(),
MaturityHeight: htlcReport.maturityHeight,
Stage: htlcReport.stage,
}
if htlc.MaturityHeight != 0 {
htlc.BlocksTilMaturity =
int32(htlc.MaturityHeight) -
currentHeight
}
forceClose.PendingHtlcs = append(forceClose.PendingHtlcs,
htlc)
}
return nil
}
func (r *rpcServer) ClosedChannels(ctx context.Context,
in *lnrpc.ClosedChannelsRequest) (*lnrpc.ClosedChannelsResponse,
error) {
filterResults := in.Cooperative || in.LocalForce ||
in.RemoteForce || in.Breach || in.FundingCanceled ||
in.Abandoned
resp := &lnrpc.ClosedChannelsResponse{}
dbChannels, err := r.server.chanDB.FetchClosedChannels(false)
if err != nil {
return nil, err
}
sort.Slice(dbChannels, func(i, j int) bool {
return dbChannels[i].CloseHeight < dbChannels[j].CloseHeight
})
for _, dbChannel := range dbChannels {
if dbChannel.IsPending {
continue
}
switch dbChannel.CloseType {
case channeldb.CooperativeClose:
if filterResults && !in.Cooperative {
continue
}
case channeldb.LocalForceClose:
if filterResults && !in.LocalForce {
continue
}
case channeldb.RemoteForceClose:
if filterResults && !in.RemoteForce {
continue
}
case channeldb.BreachClose:
if filterResults && !in.Breach {
continue
}
case channeldb.FundingCanceled:
if filterResults && !in.FundingCanceled {
continue
}
case channeldb.Abandoned:
if filterResults && !in.Abandoned {
continue
}
}
channel, err := r.createRPCClosedChannel(dbChannel)
if err != nil {
return nil, err
}
resp.Channels = append(resp.Channels, channel)
}
return resp, nil
}
func (r *rpcServer) ListChannels(ctx context.Context,
in *lnrpc.ListChannelsRequest) (*lnrpc.ListChannelsResponse, error) {
if in.ActiveOnly && in.InactiveOnly {
return nil, fmt.Errorf("either `active_only` or " +
"`inactive_only` can be set, but not both")
}
if in.PublicOnly && in.PrivateOnly {
return nil, fmt.Errorf("either `public_only` or " +
"`private_only` can be set, but not both")
}
if len(in.Peer) > 0 && len(in.Peer) != 33 {
_, err := route.NewVertexFromBytes(in.Peer)
return nil, fmt.Errorf("invalid `peer` key: %v", err)
}
resp := &lnrpc.ListChannelsResponse{}
graph := r.server.chanDB.ChannelGraph()
dbChannels, err := r.server.chanDB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
rpcsLog.Debugf("[listchannels] fetched %v channels from DB",
len(dbChannels))
for _, dbChannel := range dbChannels {
nodePub := dbChannel.IdentityPub
nodePubBytes := nodePub.SerializeCompressed()
chanPoint := dbChannel.FundingOutpoint
if len(in.Peer) > 0 && !bytes.Equal(nodePubBytes, in.Peer) {
continue
}
var peerOnline bool
if _, err := r.server.FindPeer(nodePub); err == nil {
peerOnline = true
}
channelID := lnwire.NewChanIDFromOutPoint(&chanPoint)
var linkActive bool
if link, err := r.server.htlcSwitch.GetLink(channelID); err == nil {
linkActive = link.EligibleToForward()
}
isActive := peerOnline && linkActive
channel, err := createRPCOpenChannel(r, graph, dbChannel, isActive)
if err != nil {
return nil, err
}
switch {
case in.ActiveOnly && !isActive:
continue
case in.InactiveOnly && isActive:
continue
case in.PublicOnly && channel.Private:
continue
case in.PrivateOnly && !channel.Private:
continue
}
resp.Channels = append(resp.Channels, channel)
}
return resp, nil
}
func rpcCommitmentType(chanType channeldb.ChannelType) lnrpc.CommitmentType {
if chanType.HasAnchors() {
return lnrpc.CommitmentType_ANCHORS
}
if chanType.IsTweakless() {
return lnrpc.CommitmentType_STATIC_REMOTE_KEY
}
return lnrpc.CommitmentType_LEGACY
}
func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
dbChannel *channeldb.OpenChannel, isActive bool) (*lnrpc.Channel, error) {
nodePub := dbChannel.IdentityPub
nodeID := hex.EncodeToString(nodePub.SerializeCompressed())
chanPoint := dbChannel.FundingOutpoint
isPublic := dbChannel.ChannelFlags&lnwire.FFAnnounceChannel != 0
localCommit := dbChannel.LocalCommitment
utx := btcutil.NewTx(localCommit.CommitTx)
commitBaseWeight := blockchain.GetTransactionWeight(utx)
commitWeight := commitBaseWeight + input.WitnessCommitmentTxWeight
localBalance := localCommit.LocalBalance
remoteBalance := localCommit.RemoteBalance
var sumOutputs btcutil.Amount
for _, txOut := range localCommit.CommitTx.TxOut {
sumOutputs += btcutil.Amount(txOut.Value)
}
externalCommitFee := dbChannel.Capacity - sumOutputs
commitmentType := rpcCommitmentType(dbChannel.ChanType)
channel := &lnrpc.Channel{
Active: isActive,
Private: !isPublic,
RemotePubkey: nodeID,
ChannelPoint: chanPoint.String(),
ChanId: dbChannel.ShortChannelID.ToUint64(),
Capacity: int64(dbChannel.Capacity),
LocalBalance: int64(localBalance.ToSatoshis()),
RemoteBalance: int64(remoteBalance.ToSatoshis()),
CommitFee: int64(externalCommitFee),
CommitWeight: commitWeight,
FeePerKw: int64(localCommit.FeePerKw),
TotalSatoshisSent: int64(dbChannel.TotalMSatSent.ToSatoshis()),
TotalSatoshisReceived: int64(dbChannel.TotalMSatReceived.ToSatoshis()),
NumUpdates: localCommit.CommitHeight,
PendingHtlcs: make([]*lnrpc.HTLC, len(localCommit.Htlcs)),
CsvDelay: uint32(dbChannel.LocalChanCfg.CsvDelay),
Initiator: dbChannel.IsInitiator,
ChanStatusFlags: dbChannel.ChanStatus().String(),
LocalChanReserveSat: int64(dbChannel.LocalChanCfg.ChanReserve),
RemoteChanReserveSat: int64(dbChannel.RemoteChanCfg.ChanReserve),
StaticRemoteKey: commitmentType == lnrpc.CommitmentType_STATIC_REMOTE_KEY,
CommitmentType: commitmentType,
ThawHeight: dbChannel.ThawHeight,
}
for i, htlc := range localCommit.Htlcs {
var rHash [32]byte
copy(rHash[:], htlc.RHash[:])
channel.PendingHtlcs[i] = &lnrpc.HTLC{
Incoming: htlc.Incoming,
Amount: int64(htlc.Amt.ToSatoshis()),
HashLock: rHash[:],
ExpirationHeight: htlc.RefundTimeout,
}
channel.UnsettledBalance += channel.PendingHtlcs[i].Amount
}
localBalance, remoteBalance, err := dbChannel.BalancesAtHeight(0)
if err != nil {
return nil, err
}
if dbChannel.IsInitiator {
channel.PushAmountSat = uint64(remoteBalance.ToSatoshis())
} else {
channel.PushAmountSat = uint64(localBalance.ToSatoshis())
}
if len(dbChannel.LocalShutdownScript) > 0 {
_, addresses, _, err := txscript.ExtractPkScriptAddrs(
dbChannel.LocalShutdownScript, activeNetParams.Params,
)
if err != nil {
return nil, err
}
if len(addresses) != 1 {
return nil, fmt.Errorf("expected one upfront shutdown "+
"address, got: %v", len(addresses))
}
channel.CloseAddress = addresses[0].String()
}
if !r.server.Started() {
return channel, nil
}
outpoint := dbChannel.FundingOutpoint
startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint)
switch err {
case chanfitness.ErrChannelNotFound:
rpcsLog.Infof("channel: %v not found by channel event store",
outpoint)
return channel, nil
case nil:
default:
return nil, err
}
if endTime.IsZero() {
endTime = time.Now()
}
channel.Lifetime = int64(endTime.Sub(startTime).Seconds())
uptime, err := r.server.chanEventStore.GetUptime(
outpoint, startTime, endTime,
)
if err != nil {
return nil, err
}
channel.Uptime = int64(uptime.Seconds())
return channel, nil
}
func (r *rpcServer) createRPCClosedChannel(
dbChannel *channeldb.ChannelCloseSummary) (*lnrpc.ChannelCloseSummary, error) {
nodePub := dbChannel.RemotePub
nodeID := hex.EncodeToString(nodePub.SerializeCompressed())
var (
closeType lnrpc.ChannelCloseSummary_ClosureType
openInit lnrpc.Initiator
closeInitiator lnrpc.Initiator
err error
)
openInit, closeInitiator, err = r.getInitiators(
&dbChannel.ChanPoint,
)
if err != nil {
return nil, err
}
switch dbChannel.CloseType {
case channeldb.CooperativeClose:
closeType = lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE
case channeldb.LocalForceClose:
closeType = lnrpc.ChannelCloseSummary_LOCAL_FORCE_CLOSE
case channeldb.RemoteForceClose:
closeType = lnrpc.ChannelCloseSummary_REMOTE_FORCE_CLOSE
case channeldb.BreachClose:
closeType = lnrpc.ChannelCloseSummary_BREACH_CLOSE
case channeldb.FundingCanceled:
closeType = lnrpc.ChannelCloseSummary_FUNDING_CANCELED
case channeldb.Abandoned:
closeType = lnrpc.ChannelCloseSummary_ABANDONED
}
return &lnrpc.ChannelCloseSummary{
Capacity: int64(dbChannel.Capacity),
RemotePubkey: nodeID,
CloseHeight: dbChannel.CloseHeight,
CloseType: closeType,
ChannelPoint: dbChannel.ChanPoint.String(),
ChanId: dbChannel.ShortChanID.ToUint64(),
SettledBalance: int64(dbChannel.SettledBalance),
TimeLockedBalance: int64(dbChannel.TimeLockedBalance),
ChainHash: dbChannel.ChainHash.String(),
ClosingTxHash: dbChannel.ClosingTXID.String(),
OpenInitiator: openInit,
CloseInitiator: closeInitiator,
}, nil
}
func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) (
lnrpc.Initiator,
lnrpc.Initiator, error) {
var (
openInitiator = lnrpc.Initiator_INITIATOR_UNKNOWN
closeInitiator = lnrpc.Initiator_INITIATOR_UNKNOWN
)
histChan, err := r.server.chanDB.FetchHistoricalChannel(chanPoint)
switch {
case err == channeldb.ErrNoHistoricalBucket:
return openInitiator, closeInitiator, nil
case err == channeldb.ErrChannelNotFound:
return openInitiator, closeInitiator, nil
case err != nil:
return 0, 0, err
}
if histChan.IsInitiator {
openInitiator = lnrpc.Initiator_INITIATOR_LOCAL
} else {
openInitiator = lnrpc.Initiator_INITIATOR_REMOTE
}
localInit := histChan.HasChanStatus(
channeldb.ChanStatusLocalCloseInitiator,
)
remoteInit := histChan.HasChanStatus(
channeldb.ChanStatusRemoteCloseInitiator,
)
switch {
case localInit && remoteInit:
closeInitiator = lnrpc.Initiator_INITIATOR_BOTH
case localInit:
closeInitiator = lnrpc.Initiator_INITIATOR_LOCAL
case remoteInit:
closeInitiator = lnrpc.Initiator_INITIATOR_REMOTE
}
return openInitiator, closeInitiator, nil
}
func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
updateStream lnrpc.Lightning_SubscribeChannelEventsServer) error {
channelEventSub, err := r.server.channelNotifier.SubscribeChannelEvents()
if err != nil {
return err
}
defer channelEventSub.Cancel()
graph := r.server.chanDB.ChannelGraph()
for {
select {
case e := <-channelEventSub.Updates():
var update *lnrpc.ChannelEventUpdate
switch event := e.(type) {
case channelnotifier.PendingOpenChannelEvent:
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_PendingOpenChannel{
PendingOpenChannel: &lnrpc.PendingUpdate{
Txid: event.ChannelPoint.Hash[:],
OutputIndex: event.ChannelPoint.Index,
},
},
}
case channelnotifier.OpenChannelEvent:
channel, err := createRPCOpenChannel(r, graph,
event.Channel, true)
if err != nil {
return err
}
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_OPEN_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_OpenChannel{
OpenChannel: channel,
},
}
case channelnotifier.ClosedChannelEvent:
closedChannel, err := r.createRPCClosedChannel(
event.CloseSummary,
)
if err != nil {
return err
}
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_CLOSED_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_ClosedChannel{
ClosedChannel: closedChannel,
},
}
case channelnotifier.ActiveChannelEvent:
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_ActiveChannel{
ActiveChannel: &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: event.ChannelPoint.Hash[:],
},
OutputIndex: event.ChannelPoint.Index,
},
},
}
case channelnotifier.InactiveChannelEvent:
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_INACTIVE_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_InactiveChannel{
InactiveChannel: &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: event.ChannelPoint.Hash[:],
},
OutputIndex: event.ChannelPoint.Index,
},
},
}
case channelnotifier.ActiveLinkEvent:
continue
default:
return fmt.Errorf("unexpected channel event update: %v", event)
}
if err := updateStream.Send(update); err != nil {
return err
}
case <-r.quit:
return nil
}
}
}
type paymentStream struct {
recv func() (*rpcPaymentRequest, error)
send func(*lnrpc.SendResponse) error
}
type rpcPaymentRequest struct {
*lnrpc.SendRequest
route *route.Route
}
func (r *rpcServer) SendPayment(stream lnrpc.Lightning_SendPaymentServer) error {
var lock sync.Mutex
return r.sendPayment(&paymentStream{
recv: func() (*rpcPaymentRequest, error) {
req, err := stream.Recv()
if err != nil {
return nil, err
}
return &rpcPaymentRequest{
SendRequest: req,
}, nil
},
send: func(r *lnrpc.SendResponse) error {
lock.Lock()
defer lock.Unlock()
return stream.Send(r)
},
})
}
func (r *rpcServer) SendToRoute(stream lnrpc.Lightning_SendToRouteServer) error {
var lock sync.Mutex
return r.sendPayment(&paymentStream{
recv: func() (*rpcPaymentRequest, error) {
req, err := stream.Recv()
if err != nil {
return nil, err
}
return r.unmarshallSendToRouteRequest(req)
},
send: func(r *lnrpc.SendResponse) error {
lock.Lock()
defer lock.Unlock()
return stream.Send(r)
},
})
}
func (r *rpcServer) unmarshallSendToRouteRequest(
req *lnrpc.SendToRouteRequest) (*rpcPaymentRequest, error) {
if req.Route == nil {
return nil, fmt.Errorf("unable to send, no route provided")
}
route, err := r.routerBackend.UnmarshallRoute(req.Route)
if err != nil {
return nil, err
}
return &rpcPaymentRequest{
SendRequest: &lnrpc.SendRequest{
PaymentHash: req.PaymentHash,
PaymentHashString: req.PaymentHashString,
},
route: route,
}, nil
}
type rpcPaymentIntent struct {
msat lnwire.MilliSatoshi
feeLimit lnwire.MilliSatoshi
cltvLimit uint32
dest route.Vertex
rHash [32]byte
cltvDelta uint16
routeHints [][]zpay32.HopHint
outgoingChannelID *uint64
lastHop *route.Vertex
destFeatures *lnwire.FeatureVector
paymentAddr *[32]byte
payReq []byte
destCustomRecords record.CustomSet
route *route.Route
}
func (r *rpcServer) extractPaymentIntent(rpcPayReq *rpcPaymentRequest) (rpcPaymentIntent, error) {
payIntent := rpcPaymentIntent{}
if rpcPayReq.route != nil {
if rpcPayReq.PaymentHashString != "" {
paymentHash, err := hex.DecodeString(
rpcPayReq.PaymentHashString,
)
if err != nil {
return payIntent, err
}
copy(payIntent.rHash[:], paymentHash)
} else {
copy(payIntent.rHash[:], rpcPayReq.PaymentHash)
}
payIntent.route = rpcPayReq.route
return payIntent, nil
}
if rpcPayReq.OutgoingChanId != 0 {
payIntent.outgoingChannelID = &rpcPayReq.OutgoingChanId
}
if len(rpcPayReq.LastHopPubkey) > 0 {
lastHop, err := route.NewVertexFromBytes(
rpcPayReq.LastHopPubkey,
)
if err != nil {
return payIntent, err
}
payIntent.lastHop = &lastHop
}
cltvLimit, err := routerrpc.ValidateCLTVLimit(
rpcPayReq.CltvLimit, cfg.MaxOutgoingCltvExpiry,
)
if err != nil {
return payIntent, err
}
payIntent.cltvLimit = cltvLimit
customRecords := record.CustomSet(rpcPayReq.DestCustomRecords)
if err := customRecords.Validate(); err != nil {
return payIntent, err
}
payIntent.destCustomRecords = customRecords
validateDest := func(dest route.Vertex) error {
if rpcPayReq.AllowSelfPayment {
return nil
}
if dest == r.selfNode {
return errors.New("self-payments not allowed")
}
return nil
}
if rpcPayReq.PaymentRequest != "" {
payReq, err := zpay32.Decode(
rpcPayReq.PaymentRequest, activeNetParams.Params,
)
if err != nil {
return payIntent, err
}
err = routerrpc.ValidatePayReqExpiry(payReq)
if err != nil {
return payIntent, err
}
if payReq.MilliSat == nil {
amt, err := lnrpc.UnmarshallAmt(
rpcPayReq.Amt, rpcPayReq.AmtMsat,
)
if err != nil {
return payIntent, err
}
if amt == 0 {
return payIntent, errors.New("amount must be " +
"specified when paying a zero amount " +
"invoice")
}
payIntent.msat = amt
} else {
payIntent.msat = *payReq.MilliSat
}
payIntent.feeLimit = lnrpc.CalculateFeeLimit(
rpcPayReq.FeeLimit, payIntent.msat,
)
copy(payIntent.rHash[:], payReq.PaymentHash[:])
destKey := payReq.Destination.SerializeCompressed()
copy(payIntent.dest[:], destKey)
payIntent.cltvDelta = uint16(payReq.MinFinalCLTVExpiry())
payIntent.routeHints = payReq.RouteHints
payIntent.payReq = []byte(rpcPayReq.PaymentRequest)
payIntent.destFeatures = payReq.Features
payIntent.paymentAddr = payReq.PaymentAddr
if err := validateDest(payIntent.dest); err != nil {
return payIntent, err
}
return payIntent, nil
}
var pubBytes []byte
if len(rpcPayReq.Dest) != 0 {
pubBytes = rpcPayReq.Dest
} else {
var err error
pubBytes, err = hex.DecodeString(rpcPayReq.DestString)
if err != nil {
return payIntent, err
}
}
if len(pubBytes) != 33 {
return payIntent, errors.New("invalid key length")
}
copy(payIntent.dest[:], pubBytes)
if err := validateDest(payIntent.dest); err != nil {
return payIntent, err
}
payIntent.msat, err = lnrpc.UnmarshallAmt(
rpcPayReq.Amt, rpcPayReq.AmtMsat,
)
if err != nil {
return payIntent, err
}
payIntent.feeLimit = lnrpc.CalculateFeeLimit(
rpcPayReq.FeeLimit, payIntent.msat,
)
if rpcPayReq.FinalCltvDelta != 0 {
payIntent.cltvDelta = uint16(rpcPayReq.FinalCltvDelta)
} else {
payIntent.cltvDelta = uint16(cfg.Bitcoin.TimeLockDelta)
}
switch {
case rpcPayReq.PaymentHashString != "":
paymentHash, err := hex.DecodeString(
rpcPayReq.PaymentHashString,
)
if err != nil {
return payIntent, err
}
copy(payIntent.rHash[:], paymentHash)
default:
copy(payIntent.rHash[:], rpcPayReq.PaymentHash)
}
payIntent.destFeatures, err = routerrpc.UnmarshalFeatures(
rpcPayReq.DestFeatures,
)
if err != nil {
return payIntent, err
}
if payIntent.msat > MaxPaymentMSat {
return payIntent, fmt.Errorf("payment of %v is too large, "+
"max payment allowed is %v", payIntent.msat,
MaxPaymentMSat)
}
return payIntent, nil
}
type paymentIntentResponse struct {
Route *route.Route
Preimage [32]byte
Err error
}
func (r *rpcServer) dispatchPaymentIntent(
payIntent *rpcPaymentIntent) (*paymentIntentResponse, error) {
var (
preImage [32]byte
route *route.Route
routerErr error
)
if payIntent.route == nil {
payment := &routing.LightningPayment{
Target: payIntent.dest,
Amount: payIntent.msat,
FinalCLTVDelta: payIntent.cltvDelta,
FeeLimit: payIntent.feeLimit,
CltvLimit: payIntent.cltvLimit,
PaymentHash: payIntent.rHash,
RouteHints: payIntent.routeHints,
OutgoingChannelID: payIntent.outgoingChannelID,
LastHop: payIntent.lastHop,
PaymentRequest: payIntent.payReq,
PayAttemptTimeout: routing.DefaultPayAttemptTimeout,
DestCustomRecords: payIntent.destCustomRecords,
DestFeatures: payIntent.destFeatures,
PaymentAddr: payIntent.paymentAddr,
MaxParts: 1,
}
preImage, route, routerErr = r.server.chanRouter.SendPayment(
payment,
)
} else {
preImage, routerErr = r.server.chanRouter.SendToRoute(
payIntent.rHash, payIntent.route,
)
route = payIntent.route
}
if routerErr != nil {
rpcsLog.Warnf("Unable to send payment: %v", routerErr)
return &paymentIntentResponse{
Err: routerErr,
}, nil
}
return &paymentIntentResponse{
Route: route,
Preimage: preImage,
}, nil
}
func (r *rpcServer) sendPayment(stream *paymentStream) error {
payChan := make(chan *rpcPaymentIntent)
errChan := make(chan error, 1)
if !r.server.Started() {
return ErrServerNotActive
}
const numOutstandingPayments = 2000
htlcSema := make(chan struct{}, numOutstandingPayments)
for i := 0; i < numOutstandingPayments; i++ {
htlcSema <- struct{}{}
}
var wg sync.WaitGroup
reqQuit := make(chan struct{})
defer close(reqQuit)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-reqQuit:
return
default:
nextPayment, err := stream.recv()
if err == io.EOF {
close(payChan)
return
} else if err != nil {
rpcsLog.Errorf("Failed receiving from "+
"stream: %v", err)
select {
case errChan <- err:
default:
}
return
}
payIntent, err := r.extractPaymentIntent(
nextPayment,
)
if err != nil {
if err := stream.send(&lnrpc.SendResponse{
PaymentError: err.Error(),
PaymentHash: payIntent.rHash[:],
}); err != nil {
rpcsLog.Errorf("Failed "+
"sending on "+
"stream: %v", err)
select {
case errChan <- err:
default:
}
return
}
continue
}
select {
case payChan <- &payIntent:
case <-reqQuit:
return
}
}
}
}()
sendLoop:
for {
select {
case err := <-errChan:
return err
case <-r.quit:
return errors.New("rpc server shutting down")
case payIntent, ok := <-payChan:
if !ok {
break sendLoop
}
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-htlcSema:
case <-reqQuit:
return
}
defer func() {
htlcSema <- struct{}{}
}()
resp, saveErr := r.dispatchPaymentIntent(
payIntent,
)
switch {
case saveErr != nil:
rpcsLog.Errorf("Failed dispatching "+
"payment intent: %v", saveErr)
select {
case errChan <- saveErr:
default:
}
return
case resp.Err != nil:
err := stream.send(&lnrpc.SendResponse{
PaymentError: resp.Err.Error(),
PaymentHash: payIntent.rHash[:],
})
if err != nil {
rpcsLog.Errorf("Failed "+
"sending error "+
"response: %v", err)
select {
case errChan <- err:
default:
}
}
return
}
backend := r.routerBackend
marshalledRouted, err := backend.MarshallRoute(
resp.Route,
)
if err != nil {
errChan <- err
return
}
err = stream.send(&lnrpc.SendResponse{
PaymentHash: payIntent.rHash[:],
PaymentPreimage: resp.Preimage[:],
PaymentRoute: marshalledRouted,
})
if err != nil {
rpcsLog.Errorf("Failed sending "+
"response: %v", err)
select {
case errChan <- err:
default:
}
return
}
}()
}
}
wg.Wait()
return nil
}
func (r *rpcServer) SendPaymentSync(ctx context.Context,
nextPayment *lnrpc.SendRequest) (*lnrpc.SendResponse, error) {
return r.sendPaymentSync(ctx, &rpcPaymentRequest{
SendRequest: nextPayment,
})
}
func (r *rpcServer) SendToRouteSync(ctx context.Context,
req *lnrpc.SendToRouteRequest) (*lnrpc.SendResponse, error) {
if req.Route == nil {
return nil, fmt.Errorf("unable to send, no routes provided")
}
paymentRequest, err := r.unmarshallSendToRouteRequest(req)
if err != nil {
return nil, err
}
return r.sendPaymentSync(ctx, paymentRequest)
}
func (r *rpcServer) sendPaymentSync(ctx context.Context,
nextPayment *rpcPaymentRequest) (*lnrpc.SendResponse, error) {
if !r.server.Started() {
return nil, ErrServerNotActive
}
payIntent, err := r.extractPaymentIntent(nextPayment)
if err != nil {
return nil, err
}
resp, saveErr := r.dispatchPaymentIntent(&payIntent)
switch {
case saveErr != nil:
return nil, saveErr
case resp.Err != nil:
return &lnrpc.SendResponse{
PaymentError: resp.Err.Error(),
PaymentHash: payIntent.rHash[:],
}, nil
}
rpcRoute, err := r.routerBackend.MarshallRoute(resp.Route)
if err != nil {
return nil, err
}
return &lnrpc.SendResponse{
PaymentHash: payIntent.rHash[:],
PaymentPreimage: resp.Preimage[:],
PaymentRoute: rpcRoute,
}, nil
}
func (r *rpcServer) AddInvoice(ctx context.Context,
invoice *lnrpc.Invoice) (*lnrpc.AddInvoiceResponse, error) {
defaultDelta := cfg.Bitcoin.TimeLockDelta
if registeredChains.PrimaryChain() == litecoinChain {
defaultDelta = cfg.Litecoin.TimeLockDelta
}
addInvoiceCfg := &invoicesrpc.AddInvoiceConfig{
AddInvoice: r.server.invoices.AddInvoice,
IsChannelActive: r.server.htlcSwitch.HasActiveLink,
ChainParams: activeNetParams.Params,
NodeSigner: r.server.nodeSigner,
DefaultCLTVExpiry: defaultDelta,
ChanDB: r.server.chanDB,
GenInvoiceFeatures: func() *lnwire.FeatureVector {
return r.server.featureMgr.Get(feature.SetInvoice)
},
}
value, err := lnrpc.UnmarshallAmt(invoice.Value, invoice.ValueMsat)
if err != nil {
return nil, err
}
addInvoiceData := &invoicesrpc.AddInvoiceData{
Memo: invoice.Memo,
Value: value,
DescriptionHash: invoice.DescriptionHash,
Expiry: invoice.Expiry,
FallbackAddr: invoice.FallbackAddr,
CltvExpiry: invoice.CltvExpiry,
Private: invoice.Private,
}
if invoice.RPreimage != nil {
preimage, err := lntypes.MakePreimage(invoice.RPreimage)
if err != nil {
return nil, err
}
addInvoiceData.Preimage = &preimage
}
hash, dbInvoice, err := invoicesrpc.AddInvoice(
ctx, addInvoiceCfg, addInvoiceData,
)
if err != nil {
return nil, err
}
return &lnrpc.AddInvoiceResponse{
AddIndex: dbInvoice.AddIndex,
PaymentRequest: string(dbInvoice.PaymentRequest),
RHash: hash[:],
}, nil
}
func (r *rpcServer) LookupInvoice(ctx context.Context,
req *lnrpc.PaymentHash) (*lnrpc.Invoice, error) {
var (
payHash [32]byte
rHash []byte
err error
)
if req.RHashStr != "" {
rHash, err = hex.DecodeString(req.RHashStr)
if err != nil {
return nil, err
}
} else {
rHash = req.RHash
}
if len(rHash) != 0 && len(rHash) != 32 {
return nil, fmt.Errorf("payment hash must be exactly "+
"32 bytes, is instead %v", len(rHash))
}
copy(payHash[:], rHash)
rpcsLog.Tracef("[lookupinvoice] searching for invoice %x", payHash[:])
invoice, err := r.server.invoices.LookupInvoice(payHash)
if err != nil {
return nil, err
}
rpcsLog.Tracef("[lookupinvoice] located invoice %v",
newLogClosure(func() string {
return spew.Sdump(invoice)
}))
rpcInvoice, err := invoicesrpc.CreateRPCInvoice(
&invoice, activeNetParams.Params,
)
if err != nil {
return nil, err
}
return rpcInvoice, nil
}
func (r *rpcServer) ListInvoices(ctx context.Context,
req *lnrpc.ListInvoiceRequest) (*lnrpc.ListInvoiceResponse, error) {
if req.NumMaxInvoices == 0 {
req.NumMaxInvoices = 100
}
q := channeldb.InvoiceQuery{
IndexOffset: req.IndexOffset,
NumMaxInvoices: req.NumMaxInvoices,
PendingOnly: req.PendingOnly,
Reversed: req.Reversed,
}
invoiceSlice, err := r.server.chanDB.QueryInvoices(q)
if err != nil {
return nil, fmt.Errorf("unable to query invoices: %v", err)
}
resp := &lnrpc.ListInvoiceResponse{
Invoices: make([]*lnrpc.Invoice, len(invoiceSlice.Invoices)),
FirstIndexOffset: invoiceSlice.FirstIndexOffset,
LastIndexOffset: invoiceSlice.LastIndexOffset,
}
for i, invoice := range invoiceSlice.Invoices {
resp.Invoices[i], err = invoicesrpc.CreateRPCInvoice(
&invoice, activeNetParams.Params,
)
if err != nil {
return nil, err
}
}
return resp, nil
}
func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
updateStream lnrpc.Lightning_SubscribeInvoicesServer) error {
invoiceClient := r.server.invoices.SubscribeNotifications(
req.AddIndex, req.SettleIndex,
)
defer invoiceClient.Cancel()
for {
select {
case newInvoice := <-invoiceClient.NewInvoices:
rpcInvoice, err := invoicesrpc.CreateRPCInvoice(
newInvoice, activeNetParams.Params,
)
if err != nil {
return err
}
if err := updateStream.Send(rpcInvoice); err != nil {
return err
}
case settledInvoice := <-invoiceClient.SettledInvoices:
rpcInvoice, err := invoicesrpc.CreateRPCInvoice(
settledInvoice, activeNetParams.Params,
)
if err != nil {
return err
}
if err := updateStream.Send(rpcInvoice); err != nil {
return err
}
case <-r.quit:
return nil
}
}
}
func (r *rpcServer) SubscribeTransactions(req *lnrpc.GetTransactionsRequest,
updateStream lnrpc.Lightning_SubscribeTransactionsServer) error {
txClient, err := r.server.cc.wallet.SubscribeTransactions()
if err != nil {
return err
}
defer txClient.Cancel()
for {
select {
case tx := <-txClient.ConfirmedTransactions():
destAddresses := make([]string, 0, len(tx.DestAddresses))
for _, destAddress := range tx.DestAddresses {
destAddresses = append(destAddresses, destAddress.EncodeAddress())
}
detail := &lnrpc.Transaction{
TxHash: tx.Hash.String(),
Amount: int64(tx.Value),
NumConfirmations: tx.NumConfirmations,
BlockHash: tx.BlockHash.String(),
BlockHeight: tx.BlockHeight,
TimeStamp: tx.Timestamp,
TotalFees: tx.TotalFees,
DestAddresses: destAddresses,
RawTxHex: hex.EncodeToString(tx.RawTx),
}
if err := updateStream.Send(detail); err != nil {
return err
}
case tx := <-txClient.UnconfirmedTransactions():
var destAddresses []string
for _, destAddress := range tx.DestAddresses {
destAddresses = append(destAddresses, destAddress.EncodeAddress())
}
detail := &lnrpc.Transaction{
TxHash: tx.Hash.String(),
Amount: int64(tx.Value),
TimeStamp: tx.Timestamp,
TotalFees: tx.TotalFees,
DestAddresses: destAddresses,
RawTxHex: hex.EncodeToString(tx.RawTx),
}
if err := updateStream.Send(detail); err != nil {
return err
}
case <-r.quit:
return nil
}
}
}
func (r *rpcServer) GetTransactions(ctx context.Context,
_ *lnrpc.GetTransactionsRequest) (*lnrpc.TransactionDetails, error) {
transactions, err := r.server.cc.wallet.ListTransactionDetails()
if err != nil {
return nil, err
}
txDetails := &lnrpc.TransactionDetails{
Transactions: make([]*lnrpc.Transaction, len(transactions)),
}
for i, tx := range transactions {
var destAddresses []string
for _, destAddress := range tx.DestAddresses {
destAddresses = append(destAddresses, destAddress.EncodeAddress())
}
blockHash := ""
if tx.BlockHash != nil {
blockHash = tx.BlockHash.String()
}
txDetails.Transactions[i] = &lnrpc.Transaction{
TxHash: tx.Hash.String(),
Amount: int64(tx.Value),
NumConfirmations: tx.NumConfirmations,
BlockHash: blockHash,
BlockHeight: tx.BlockHeight,
TimeStamp: tx.Timestamp,
TotalFees: tx.TotalFees,
DestAddresses: destAddresses,
RawTxHex: hex.EncodeToString(tx.RawTx),
}
}
return txDetails, nil
}
func (r *rpcServer) DescribeGraph(ctx context.Context,
req *lnrpc.ChannelGraphRequest) (*lnrpc.ChannelGraph, error) {
resp := &lnrpc.ChannelGraph{}
includeUnannounced := req.IncludeUnannounced
graph := r.server.chanDB.ChannelGraph()
err := graph.ForEachNode(nil, func(_ kvdb.ReadTx, node *channeldb.LightningNode) error {
nodeAddrs := make([]*lnrpc.NodeAddress, 0)
for _, addr := range node.Addresses {
nodeAddr := &lnrpc.NodeAddress{
Network: addr.Network(),
Addr: addr.String(),
}
nodeAddrs = append(nodeAddrs, nodeAddr)
}
lnNode := &lnrpc.LightningNode{
LastUpdate: uint32(node.LastUpdate.Unix()),
PubKey: hex.EncodeToString(node.PubKeyBytes[:]),
Addresses: nodeAddrs,
Alias: node.Alias,
Color: routing.EncodeHexColor(node.Color),
Features: invoicesrpc.CreateRPCFeatures(node.Features),
}
resp.Nodes = append(resp.Nodes, lnNode)
return nil
})
if err != nil {
return nil, err
}
err = graph.ForEachChannel(func(edgeInfo *channeldb.ChannelEdgeInfo,
c1, c2 *channeldb.ChannelEdgePolicy) error {
if !includeUnannounced && edgeInfo.AuthProof == nil {
return nil
}
edge := marshalDbEdge(edgeInfo, c1, c2)
resp.Edges = append(resp.Edges, edge)
return nil
})
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return nil, err
}
return resp, nil
}
func marshalDbEdge(edgeInfo *channeldb.ChannelEdgeInfo,
c1, c2 *channeldb.ChannelEdgePolicy) *lnrpc.ChannelEdge {
if bytes.Compare(edgeInfo.NodeKey2Bytes[:],
edgeInfo.NodeKey1Bytes[:]) < 0 {
c2, c1 = c1, c2
}
var lastUpdate int64
if c1 != nil {
lastUpdate = c1.LastUpdate.Unix()
}
if c2 != nil && c2.LastUpdate.Unix() > lastUpdate {
lastUpdate = c2.LastUpdate.Unix()
}
edge := &lnrpc.ChannelEdge{
ChannelId: edgeInfo.ChannelID,
ChanPoint: edgeInfo.ChannelPoint.String(),
LastUpdate: uint32(lastUpdate),
Node1Pub: hex.EncodeToString(edgeInfo.NodeKey1Bytes[:]),
Node2Pub: hex.EncodeToString(edgeInfo.NodeKey2Bytes[:]),
Capacity: int64(edgeInfo.Capacity),
}
if c1 != nil {
edge.Node1Policy = &lnrpc.RoutingPolicy{
TimeLockDelta: uint32(c1.TimeLockDelta),
MinHtlc: int64(c1.MinHTLC),
MaxHtlcMsat: uint64(c1.MaxHTLC),
FeeBaseMsat: int64(c1.FeeBaseMSat),
FeeRateMilliMsat: int64(c1.FeeProportionalMillionths),
Disabled: c1.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
LastUpdate: uint32(c1.LastUpdate.Unix()),
}
}
if c2 != nil {
edge.Node2Policy = &lnrpc.RoutingPolicy{
TimeLockDelta: uint32(c2.TimeLockDelta),
MinHtlc: int64(c2.MinHTLC),
MaxHtlcMsat: uint64(c2.MaxHTLC),
FeeBaseMsat: int64(c2.FeeBaseMSat),
FeeRateMilliMsat: int64(c2.FeeProportionalMillionths),
Disabled: c2.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
LastUpdate: uint32(c2.LastUpdate.Unix()),
}
}
return edge
}
func (r *rpcServer) GetNodeMetrics(ctx context.Context,
req *lnrpc.NodeMetricsRequest) (*lnrpc.NodeMetricsResponse, error) {
getCentrality := false
for _, t := range req.Types {
if t == lnrpc.NodeMetricType_BETWEENNESS_CENTRALITY {
getCentrality = true
}
}
if !getCentrality {
return nil, nil
}
resp := &lnrpc.NodeMetricsResponse{
BetweennessCentrality: make(map[string]*lnrpc.FloatMetric),
}
graph := r.server.chanDB.ChannelGraph()
channelGraph := autopilot.ChannelGraphFromDatabase(graph)
centralityMetric, err := autopilot.NewBetweennessCentralityMetric(
runtime.NumCPU(),
)
if err != nil {
return nil, err
}
if err := centralityMetric.Refresh(channelGraph); err != nil {
return nil, err
}
centrality := centralityMetric.GetMetric(true)
for nodeID, val := range centrality {
resp.BetweennessCentrality[hex.EncodeToString(nodeID[:])] =
&lnrpc.FloatMetric{
NormalizedValue: val,
}
}
centrality = centralityMetric.GetMetric(false)
for nodeID, val := range centrality {
resp.BetweennessCentrality[hex.EncodeToString(nodeID[:])].Value = val
}
return resp, nil
}
func (r *rpcServer) GetChanInfo(ctx context.Context,
in *lnrpc.ChanInfoRequest) (*lnrpc.ChannelEdge, error) {
graph := r.server.chanDB.ChannelGraph()
edgeInfo, edge1, edge2, err := graph.FetchChannelEdgesByID(in.ChanId)
if err != nil {
return nil, err
}
channelEdge := marshalDbEdge(edgeInfo, edge1, edge2)
return channelEdge, nil
}
func (r *rpcServer) GetNodeInfo(ctx context.Context,
in *lnrpc.NodeInfoRequest) (*lnrpc.NodeInfo, error) {
graph := r.server.chanDB.ChannelGraph()
pubKey, err := route.NewVertexFromStr(in.PubKey)
if err != nil {
return nil, err
}
node, err := graph.FetchLightningNode(nil, pubKey)
if err != nil {
return nil, err
}
var (
numChannels uint32
totalCapacity btcutil.Amount
channels []*lnrpc.ChannelEdge
)
if err := node.ForEachChannel(nil, func(_ kvdb.ReadTx,
edge *channeldb.ChannelEdgeInfo,
c1, c2 *channeldb.ChannelEdgePolicy) error {
numChannels++
totalCapacity += edge.Capacity
if in.IncludeChannels {
if edge.AuthProof == nil {
return nil
}
channelEdge := marshalDbEdge(edge, c1, c2)
channels = append(channels, channelEdge)
}
return nil
}); err != nil {
return nil, err
}
nodeAddrs := make([]*lnrpc.NodeAddress, 0)
for _, addr := range node.Addresses {
nodeAddr := &lnrpc.NodeAddress{
Network: addr.Network(),
Addr: addr.String(),
}
nodeAddrs = append(nodeAddrs, nodeAddr)
}
features := invoicesrpc.CreateRPCFeatures(node.Features)
return &lnrpc.NodeInfo{
Node: &lnrpc.LightningNode{
LastUpdate: uint32(node.LastUpdate.Unix()),
PubKey: in.PubKey,
Addresses: nodeAddrs,
Alias: node.Alias,
Color: routing.EncodeHexColor(node.Color),
Features: features,
},
NumChannels: numChannels,
TotalCapacity: int64(totalCapacity),
Channels: channels,
}, nil
}
func (r *rpcServer) QueryRoutes(ctx context.Context,
in *lnrpc.QueryRoutesRequest) (*lnrpc.QueryRoutesResponse, error) {
return r.routerBackend.QueryRoutes(ctx, in)
}
func (r *rpcServer) GetNetworkInfo(ctx context.Context,
_ *lnrpc.NetworkInfoRequest) (*lnrpc.NetworkInfo, error) {
graph := r.server.chanDB.ChannelGraph()
var (
numNodes uint32
numChannels uint32
maxChanOut uint32
totalNetworkCapacity btcutil.Amount
minChannelSize btcutil.Amount = math.MaxInt64
maxChannelSize btcutil.Amount
medianChanSize btcutil.Amount
)
seenChans := make(map[uint64]struct{})
var allChans []btcutil.Amount
if err := graph.ForEachNode(nil, func(tx kvdb.ReadTx, node *channeldb.LightningNode) error {
numNodes++
var outDegree uint32
if err := node.ForEachChannel(tx, func(_ kvdb.ReadTx,
edge *channeldb.ChannelEdgeInfo, _, _ *channeldb.ChannelEdgePolicy) error {
outDegree++
if _, ok := seenChans[edge.ChannelID]; ok {
return nil
}
chanCapacity := edge.Capacity
if chanCapacity < minChannelSize {
minChannelSize = chanCapacity
}
if chanCapacity > maxChannelSize {
maxChannelSize = chanCapacity
}
totalNetworkCapacity += chanCapacity
numChannels++
seenChans[edge.ChannelID] = struct{}{}
allChans = append(allChans, edge.Capacity)
return nil
}); err != nil {
return err
}
if outDegree > maxChanOut {
maxChanOut = outDegree
}
return nil
}); err != nil {
return nil, err
}
numZombies, err := graph.NumZombies()
if err != nil {
return nil, err
}
medianChanSize = autopilot.Median(allChans)
if numChannels == 0 {
minChannelSize = 0
}
netInfo := &lnrpc.NetworkInfo{
MaxOutDegree: maxChanOut,
AvgOutDegree: float64(2*numChannels) / float64(numNodes),
NumNodes: numNodes,
NumChannels: numChannels,
TotalNetworkCapacity: int64(totalNetworkCapacity),
AvgChannelSize: float64(totalNetworkCapacity) / float64(numChannels),
MinChannelSize: int64(minChannelSize),
MaxChannelSize: int64(maxChannelSize),
MedianChannelSizeSat: int64(medianChanSize),
NumZombieChans: numZombies,
}
if numChannels == 0 {
netInfo.AvgChannelSize = 0
}
return netInfo, nil
}
func (r *rpcServer) StopDaemon(ctx context.Context,
_ *lnrpc.StopRequest) (*lnrpc.StopResponse, error) {
signal.RequestShutdown()
return &lnrpc.StopResponse{}, nil
}
func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
updateStream lnrpc.Lightning_SubscribeChannelGraphServer) error {
client, err := r.server.chanRouter.SubscribeTopology()
if err != nil {
return err
}
defer client.Cancel()
for {
select {
case topChange, ok := <-client.TopologyChanges:
if !ok {
return errors.New("server shutting down")
}
graphUpdate := marshallTopologyChange(topChange)
if err := updateStream.Send(graphUpdate); err != nil {
return err
}
case <-r.quit:
return nil
}
}
}
func marshallTopologyChange(topChange *routing.TopologyChange) *lnrpc.GraphTopologyUpdate {
encodeKey := func(k *btcec.PublicKey) string {
return hex.EncodeToString(k.SerializeCompressed())
}
nodeUpdates := make([]*lnrpc.NodeUpdate, len(topChange.NodeUpdates))
for i, nodeUpdate := range topChange.NodeUpdates {
addrs := make([]string, len(nodeUpdate.Addresses))
for i, addr := range nodeUpdate.Addresses {
addrs[i] = addr.String()
}
nodeUpdates[i] = &lnrpc.NodeUpdate{
Addresses: addrs,
IdentityKey: encodeKey(nodeUpdate.IdentityKey),
GlobalFeatures: nodeUpdate.GlobalFeatures,
Alias: nodeUpdate.Alias,
Color: nodeUpdate.Color,
}
}
channelUpdates := make([]*lnrpc.ChannelEdgeUpdate, len(topChange.ChannelEdgeUpdates))
for i, channelUpdate := range topChange.ChannelEdgeUpdates {
channelUpdates[i] = &lnrpc.ChannelEdgeUpdate{
ChanId: channelUpdate.ChanID,
ChanPoint: &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: channelUpdate.ChanPoint.Hash[:],
},
OutputIndex: channelUpdate.ChanPoint.Index,
},
Capacity: int64(channelUpdate.Capacity),
RoutingPolicy: &lnrpc.RoutingPolicy{
TimeLockDelta: uint32(channelUpdate.TimeLockDelta),
MinHtlc: int64(channelUpdate.MinHTLC),
MaxHtlcMsat: uint64(channelUpdate.MaxHTLC),
FeeBaseMsat: int64(channelUpdate.BaseFee),
FeeRateMilliMsat: int64(channelUpdate.FeeRate),
Disabled: channelUpdate.Disabled,
},
AdvertisingNode: encodeKey(channelUpdate.AdvertisingNode),
ConnectingNode: encodeKey(channelUpdate.ConnectingNode),
}
}
closedChans := make([]*lnrpc.ClosedChannelUpdate, len(topChange.ClosedChannels))
for i, closedChan := range topChange.ClosedChannels {
closedChans[i] = &lnrpc.ClosedChannelUpdate{
ChanId: closedChan.ChanID,
Capacity: int64(closedChan.Capacity),
ClosedHeight: closedChan.ClosedHeight,
ChanPoint: &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: closedChan.ChanPoint.Hash[:],
},
OutputIndex: closedChan.ChanPoint.Index,
},
}
}
return &lnrpc.GraphTopologyUpdate{
NodeUpdates: nodeUpdates,
ChannelUpdates: channelUpdates,
ClosedChans: closedChans,
}
}
func (r *rpcServer) ListPayments(ctx context.Context,
req *lnrpc.ListPaymentsRequest) (*lnrpc.ListPaymentsResponse, error) {
rpcsLog.Debugf("[ListPayments]")
query := channeldb.PaymentsQuery{
IndexOffset: req.IndexOffset,
MaxPayments: req.MaxPayments,
Reversed: req.Reversed,
IncludeIncomplete: req.IncludeIncomplete,
}
if req.MaxPayments == 0 {
query.MaxPayments = math.MaxUint64
}
paymentsQuerySlice, err := r.server.chanDB.QueryPayments(query)
if err != nil {
return nil, err
}
paymentsResp := &lnrpc.ListPaymentsResponse{
LastIndexOffset: paymentsQuerySlice.LastIndexOffset,
FirstIndexOffset: paymentsQuerySlice.FirstIndexOffset,
}
for _, payment := range paymentsQuerySlice.Payments {
payment := payment
rpcPayment, err := r.routerBackend.MarshallPayment(payment)
if err != nil {
return nil, err
}
paymentsResp.Payments = append(
paymentsResp.Payments, rpcPayment,
)
}
return paymentsResp, nil
}
func (r *rpcServer) DeleteAllPayments(ctx context.Context,
_ *lnrpc.DeleteAllPaymentsRequest) (*lnrpc.DeleteAllPaymentsResponse, error) {
rpcsLog.Debugf("[DeleteAllPayments]")
if err := r.server.chanDB.DeletePayments(); err != nil {
return nil, err
}
return &lnrpc.DeleteAllPaymentsResponse{}, nil
}
func (r *rpcServer) DebugLevel(ctx context.Context,
req *lnrpc.DebugLevelRequest) (*lnrpc.DebugLevelResponse, error) {
if req.Show {
return &lnrpc.DebugLevelResponse{
SubSystems: strings.Join(
logWriter.SupportedSubsystems(), " ",
),
}, nil
}
rpcsLog.Infof("[debuglevel] changing debug level to: %v", req.LevelSpec)
err := build.ParseAndSetDebugLevels(req.LevelSpec, logWriter)
if err != nil {
return nil, err
}
return &lnrpc.DebugLevelResponse{}, nil
}
func (r *rpcServer) DecodePayReq(ctx context.Context,
req *lnrpc.PayReqString) (*lnrpc.PayReq, error) {
rpcsLog.Tracef("[decodepayreq] decoding: %v", req.PayReq)
payReq, err := zpay32.Decode(req.PayReq, activeNetParams.Params)
if err != nil {
return nil, err
}
desc := ""
if payReq.Description != nil {
desc = *payReq.Description
}
descHash := []byte("")
if payReq.DescriptionHash != nil {
descHash = payReq.DescriptionHash[:]
}
fallbackAddr := ""
if payReq.FallbackAddr != nil {
fallbackAddr = payReq.FallbackAddr.String()
}
expiry := int64(payReq.Expiry().Seconds())
routeHints := invoicesrpc.CreateRPCRouteHints(payReq.RouteHints)
var amtSat, amtMsat int64
if payReq.MilliSat != nil {
amtSat = int64(payReq.MilliSat.ToSatoshis())
amtMsat = int64(*payReq.MilliSat)
}
var paymentAddr []byte
if payReq.PaymentAddr != nil {
paymentAddr = payReq.PaymentAddr[:]
}
dest := payReq.Destination.SerializeCompressed()
return &lnrpc.PayReq{
Destination: hex.EncodeToString(dest),
PaymentHash: hex.EncodeToString(payReq.PaymentHash[:]),
NumSatoshis: amtSat,
NumMsat: amtMsat,
Timestamp: payReq.Timestamp.Unix(),
Description: desc,
DescriptionHash: hex.EncodeToString(descHash[:]),
FallbackAddr: fallbackAddr,
Expiry: expiry,
CltvExpiry: int64(payReq.MinFinalCLTVExpiry()),
RouteHints: routeHints,
PaymentAddr: paymentAddr,
Features: invoicesrpc.CreateRPCFeatures(payReq.Features),
}, nil
}
const feeBase = 1000000
func (r *rpcServer) FeeReport(ctx context.Context,
_ *lnrpc.FeeReportRequest) (*lnrpc.FeeReportResponse, error) {
rpcsLog.Debugf("[feereport]")
channelGraph := r.server.chanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
}
var feeReports []*lnrpc.ChannelFeeReport
err = selfNode.ForEachChannel(nil, func(_ kvdb.ReadTx, chanInfo *channeldb.ChannelEdgeInfo,
edgePolicy, _ *channeldb.ChannelEdgePolicy) error {
if edgePolicy == nil {
return fmt.Errorf("no policy for outgoing channel %v ",
chanInfo.ChannelID)
}
feeRateFixedPoint := edgePolicy.FeeProportionalMillionths
feeRate := float64(feeRateFixedPoint) / float64(feeBase)
feeReports = append(feeReports, &lnrpc.ChannelFeeReport{
ChanId: chanInfo.ChannelID,
ChannelPoint: chanInfo.ChannelPoint.String(),
BaseFeeMsat: int64(edgePolicy.FeeBaseMSat),
FeePerMil: int64(feeRateFixedPoint),
FeeRate: feeRate,
})
return nil
})
if err != nil {
return nil, err
}
fwdEventLog := r.server.chanDB.ForwardingLog()
computeFeeSum := func(query channeldb.ForwardingEventQuery) (lnwire.MilliSatoshi, error) {
var totalFees lnwire.MilliSatoshi
for {
timeSlice, err := fwdEventLog.Query(query)
if err != nil {
return 0, err
}
if len(timeSlice.ForwardingEvents) == 0 {
break
}
for _, event := range timeSlice.ForwardingEvents {
fee := event.AmtIn - event.AmtOut
totalFees += fee
}
query.IndexOffset = timeSlice.LastIndexOffset
}
return totalFees, nil
}
now := time.Now()
if err := r.server.htlcSwitch.FlushForwardingEvents(); err != nil {
return nil, fmt.Errorf("unable to flush forwarding "+
"events: %v", err)
}
dayQuery := channeldb.ForwardingEventQuery{
StartTime: now.Add(-time.Hour * 24),
EndTime: now,
NumMaxEvents: 1000,
}
dayFees, err := computeFeeSum(dayQuery)
if err != nil {
return nil, fmt.Errorf("unable to retrieve day fees: %v", err)
}
weekQuery := channeldb.ForwardingEventQuery{
StartTime: now.Add(-time.Hour * 24 * 7),
EndTime: now,
NumMaxEvents: 1000,
}
weekFees, err := computeFeeSum(weekQuery)
if err != nil {
return nil, fmt.Errorf("unable to retrieve day fees: %v", err)
}
monthQuery := channeldb.ForwardingEventQuery{
StartTime: now.Add(-time.Hour * 24 * 30),
EndTime: now,
NumMaxEvents: 1000,
}
monthFees, err := computeFeeSum(monthQuery)
if err != nil {
return nil, fmt.Errorf("unable to retrieve day fees: %v", err)
}
return &lnrpc.FeeReportResponse{
ChannelFees: feeReports,
DayFeeSum: uint64(dayFees.ToSatoshis()),
WeekFeeSum: uint64(weekFees.ToSatoshis()),
MonthFeeSum: uint64(monthFees.ToSatoshis()),
}, nil
}
const minFeeRate = 1e-6
func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
req *lnrpc.PolicyUpdateRequest) (*lnrpc.PolicyUpdateResponse, error) {
var targetChans []wire.OutPoint
switch scope := req.Scope.(type) {
case *lnrpc.PolicyUpdateRequest_Global:
case *lnrpc.PolicyUpdateRequest_ChanPoint:
txid, err := GetChanPointFundingTxid(scope.ChanPoint)
if err != nil {
return nil, err
}
targetChans = append(targetChans, wire.OutPoint{
Hash: *txid,
Index: scope.ChanPoint.OutputIndex,
})
default:
return nil, fmt.Errorf("unknown scope: %v", scope)
}
switch {
case req.FeeRate != 0 && req.FeeRate < minFeeRate:
return nil, fmt.Errorf("fee rate of %v is too small, min fee "+
"rate is %v", req.FeeRate, minFeeRate)
case req.TimeLockDelta < minTimeLockDelta:
return nil, fmt.Errorf("time lock delta of %v is too small, "+
"minimum supported is %v", req.TimeLockDelta,
minTimeLockDelta)
}
feeRateFixed := uint32(req.FeeRate * feeBase)
baseFeeMsat := lnwire.MilliSatoshi(req.BaseFeeMsat)
feeSchema := routing.FeeSchema{
BaseFee: baseFeeMsat,
FeeRate: feeRateFixed,
}
maxHtlc := lnwire.MilliSatoshi(req.MaxHtlcMsat)
var minHtlc *lnwire.MilliSatoshi
if req.MinHtlcMsatSpecified {
min := lnwire.MilliSatoshi(req.MinHtlcMsat)
minHtlc = &min
}
chanPolicy := routing.ChannelPolicy{
FeeSchema: feeSchema,
TimeLockDelta: req.TimeLockDelta,
MaxHTLC: maxHtlc,
MinHTLC: minHtlc,
}
rpcsLog.Debugf("[updatechanpolicy] updating channel policy base_fee=%v, "+
"rate_float=%v, rate_fixed=%v, time_lock_delta: %v, "+
"min_htlc=%v, max_htlc=%v, targets=%v",
req.BaseFeeMsat, req.FeeRate, feeRateFixed, req.TimeLockDelta,
minHtlc, maxHtlc,
spew.Sdump(targetChans))
err := r.server.localChanMgr.UpdatePolicy(chanPolicy, targetChans...)
if err != nil {
return nil, err
}
return &lnrpc.PolicyUpdateResponse{}, nil
}
func (r *rpcServer) ForwardingHistory(ctx context.Context,
req *lnrpc.ForwardingHistoryRequest) (*lnrpc.ForwardingHistoryResponse, error) {
rpcsLog.Debugf("[forwardinghistory]")
if err := r.server.htlcSwitch.FlushForwardingEvents(); err != nil {
return nil, fmt.Errorf("unable to flush forwarding "+
"events: %v", err)
}
var (
startTime, endTime time.Time
numEvents uint32
)
startTime = time.Unix(int64(req.StartTime), 0)
if req.EndTime == 0 {
now := time.Now()
endTime = now
} else {
endTime = time.Unix(int64(req.EndTime), 0)
}
numEvents = req.NumMaxEvents
if numEvents == 0 {
numEvents = 100
}
eventQuery := channeldb.ForwardingEventQuery{
StartTime: startTime,
EndTime: endTime,
IndexOffset: req.IndexOffset,
NumMaxEvents: numEvents,
}
timeSlice, err := r.server.chanDB.ForwardingLog().Query(eventQuery)
if err != nil {
return nil, fmt.Errorf("unable to query forwarding log: %v", err)
}
resp := &lnrpc.ForwardingHistoryResponse{
ForwardingEvents: make([]*lnrpc.ForwardingEvent, len(timeSlice.ForwardingEvents)),
LastOffsetIndex: timeSlice.LastIndexOffset,
}
for i, event := range timeSlice.ForwardingEvents {
amtInMsat := event.AmtIn
amtOutMsat := event.AmtOut
feeMsat := event.AmtIn - event.AmtOut
resp.ForwardingEvents[i] = &lnrpc.ForwardingEvent{
Timestamp: uint64(event.Timestamp.Unix()),
ChanIdIn: event.IncomingChanID.ToUint64(),
ChanIdOut: event.OutgoingChanID.ToUint64(),
AmtIn: uint64(amtInMsat.ToSatoshis()),
AmtOut: uint64(amtOutMsat.ToSatoshis()),
Fee: uint64(feeMsat.ToSatoshis()),
FeeMsat: uint64(feeMsat),
AmtInMsat: uint64(amtInMsat),
AmtOutMsat: uint64(amtOutMsat),
}
}
return resp, nil
}
func (r *rpcServer) ExportChannelBackup(ctx context.Context,
in *lnrpc.ExportChannelBackupRequest) (*lnrpc.ChannelBackup, error) {
txid, err := GetChanPointFundingTxid(in.ChanPoint)
if err != nil {
return nil, err
}
chanPoint := wire.OutPoint{
Hash: *txid,
Index: in.ChanPoint.OutputIndex,
}
unpackedBackup, err := chanbackup.FetchBackupForChan(
chanPoint, r.server.chanDB,
)
if err != nil {
return nil, err
}
packedBackups, err := chanbackup.PackStaticChanBackups(
[]chanbackup.Single{*unpackedBackup},
r.server.cc.keyRing,
)
if err != nil {
return nil, fmt.Errorf("packing of back ups failed: %v", err)
}
packedBackup, ok := packedBackups[chanPoint]
if !ok {
return nil, fmt.Errorf("expected single backup for "+
"ChannelPoint(%v), got %v", chanPoint,
len(packedBackup))
}
return &lnrpc.ChannelBackup{
ChanPoint: in.ChanPoint,
ChanBackup: packedBackup,
}, nil
}
func (r *rpcServer) VerifyChanBackup(ctx context.Context,
in *lnrpc.ChanBackupSnapshot) (*lnrpc.VerifyChanBackupResponse, error) {
switch {
case in.GetSingleChanBackups() == nil && in.GetMultiChanBackup() == nil:
return nil, errors.New("either a Single or Multi channel " +
"backup must be specified")
case in.GetSingleChanBackups() != nil && in.GetMultiChanBackup() != nil:
return nil, errors.New("either a Single or Multi channel " +
"backup must be specified, but not both")
case in.GetSingleChanBackups() != nil:
chanBackupsProtos := in.GetSingleChanBackups().ChanBackups
if len(chanBackupsProtos) != 1 {
return nil, errors.New("only one Single is accepted " +
"at a time")
}
chanBackup := chanbackup.PackedSingles(
[][]byte{chanBackupsProtos[0].ChanBackup},
)
_, err := chanBackup.Unpack(r.server.cc.keyRing)
if err != nil {
return nil, fmt.Errorf("invalid single channel "+
"backup: %v", err)
}
case in.GetMultiChanBackup() != nil:
packedMultiBackup := in.GetMultiChanBackup().MultiChanBackup
packedMulti := chanbackup.PackedMulti(packedMultiBackup)
_, err := packedMulti.Unpack(r.server.cc.keyRing)
if err != nil {
return nil, fmt.Errorf("invalid multi channel backup: "+
"%v", err)
}
}
return &lnrpc.VerifyChanBackupResponse{}, nil
}
func (r *rpcServer) createBackupSnapshot(backups []chanbackup.Single) (
*lnrpc.ChanBackupSnapshot, error) {
singleChanPackedBackups, err := chanbackup.PackStaticChanBackups(
backups, r.server.cc.keyRing,
)
if err != nil {
return nil, fmt.Errorf("unable to pack set of chan "+
"backups: %v", err)
}
numBackups := len(singleChanPackedBackups)
singleBackupResp := &lnrpc.ChannelBackups{
ChanBackups: make([]*lnrpc.ChannelBackup, 0, numBackups),
}
for chanPoint, singlePackedBackup := range singleChanPackedBackups {
txid := chanPoint.Hash
rpcChanPoint := &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: txid[:],
},
OutputIndex: chanPoint.Index,
}
singleBackupResp.ChanBackups = append(
singleBackupResp.ChanBackups,
&lnrpc.ChannelBackup{
ChanPoint: rpcChanPoint,
ChanBackup: singlePackedBackup,
},
)
}
var b bytes.Buffer
unpackedMultiBackup := chanbackup.Multi{
StaticBackups: backups,
}
err = unpackedMultiBackup.PackToWriter(&b, r.server.cc.keyRing)
if err != nil {
return nil, fmt.Errorf("unable to multi-pack backups: %v", err)
}
multiBackupResp := &lnrpc.MultiChanBackup{
MultiChanBackup: b.Bytes(),
}
for _, singleBackup := range singleBackupResp.ChanBackups {
multiBackupResp.ChanPoints = append(
multiBackupResp.ChanPoints, singleBackup.ChanPoint,
)
}
return &lnrpc.ChanBackupSnapshot{
SingleChanBackups: singleBackupResp,
MultiChanBackup: multiBackupResp,
}, nil
}
func (r *rpcServer) ExportAllChannelBackups(ctx context.Context,
in *lnrpc.ChanBackupExportRequest) (*lnrpc.ChanBackupSnapshot, error) {
allUnpackedBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
)
if err != nil {
return nil, fmt.Errorf("unable to fetch all static chan "+
"backups: %v", err)
}
return r.createBackupSnapshot(allUnpackedBackups)
}
func (r *rpcServer) RestoreChannelBackups(ctx context.Context,
in *lnrpc.RestoreChanBackupRequest) (*lnrpc.RestoreBackupResponse, error) {
chanRestorer := &chanDBRestorer{
db: r.server.chanDB,
secretKeys: r.server.cc.keyRing,
chainArb: r.server.chainArb,
}
switch {
case in.GetChanBackups() != nil:
chanBackupsProtos := in.GetChanBackups()
packedBackups := make([][]byte, 0, len(chanBackupsProtos.ChanBackups))
for _, chanBackup := range chanBackupsProtos.ChanBackups {
packedBackups = append(
packedBackups, chanBackup.ChanBackup,
)
}
err := chanbackup.UnpackAndRecoverSingles(
chanbackup.PackedSingles(packedBackups),
r.server.cc.keyRing, chanRestorer, r.server,
)
if err != nil {
return nil, fmt.Errorf("unable to unpack single "+
"backups: %v", err)
}
case in.GetMultiChanBackup() != nil:
packedMultiBackup := in.GetMultiChanBackup()
packedMulti := chanbackup.PackedMulti(packedMultiBackup)
err := chanbackup.UnpackAndRecoverMulti(
packedMulti, r.server.cc.keyRing, chanRestorer,
r.server,
)
if err != nil {
return nil, fmt.Errorf("unable to unpack chan "+
"backup: %v", err)
}
}
return &lnrpc.RestoreBackupResponse{}, nil
}
func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription,
updateStream lnrpc.Lightning_SubscribeChannelBackupsServer) error {
chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents()
if err != nil {
return err
}
defer chanSubscription.Cancel()
for {
select {
case e := <-chanSubscription.Updates():
switch e.(type) {
case channelnotifier.ActiveChannelEvent:
continue
case channelnotifier.InactiveChannelEvent:
continue
case channelnotifier.ActiveLinkEvent:
continue
}
chanBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
)
if err != nil {
return fmt.Errorf("unable to fetch all "+
"static chan backups: %v", err)
}
backupSnapshot, err := r.createBackupSnapshot(
chanBackups,
)
if err != nil {
return err
}
err = updateStream.Send(backupSnapshot)
if err != nil {
return err
}
case <-r.quit:
return nil
}
}
}
type chanAcceptInfo struct {
chanReq *chanacceptor.ChannelAcceptRequest
responseChan chan bool
}
func (r *rpcServer) ChannelAcceptor(stream lnrpc.Lightning_ChannelAcceptorServer) error {
chainedAcceptor := r.chanPredicate
newRequests := make(chan *chanAcceptInfo)
responses := make(chan lnrpc.ChannelAcceptResponse)
quit := make(chan struct{})
defer close(quit)
demultiplexReq := func(req *chanacceptor.ChannelAcceptRequest) bool {
respChan := make(chan bool, 1)
newRequest := &chanAcceptInfo{
chanReq: req,
responseChan: respChan,
}
timeout := time.After(cfg.AcceptorTimeout)
select {
case newRequests <- newRequest:
case <-timeout:
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
cfg.AcceptorTimeout)
return false
case <-quit:
return false
case <-r.quit:
return false
}
select {
case resp := <-respChan:
return resp
case <-timeout:
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
cfg.AcceptorTimeout)
return false
case <-quit:
return false
case <-r.quit:
return false
}
}
rpcAcceptor := chanacceptor.NewRPCAcceptor(demultiplexReq)
id := chainedAcceptor.AddAcceptor(rpcAcceptor)
defer chainedAcceptor.RemoveAcceptor(id)
errChan := make(chan error, 1)
go func() {
for {
resp, err := stream.Recv()
if err != nil {
errChan <- err
return
}
var pendingID [32]byte
copy(pendingID[:], resp.PendingChanId)
openChanResp := lnrpc.ChannelAcceptResponse{
Accept: resp.Accept,
PendingChanId: pendingID[:],
}
select {
case responses <- openChanResp:
case <-quit:
return
case <-r.quit:
return
}
}
}()
acceptRequests := make(map[[32]byte]chan bool)
for {
select {
case newRequest := <-newRequests:
req := newRequest.chanReq
pendingChanID := req.OpenChanMsg.PendingChannelID
acceptRequests[pendingChanID] = newRequest.responseChan
chanAcceptReq := &lnrpc.ChannelAcceptRequest{
NodePubkey: req.Node.SerializeCompressed(),
ChainHash: req.OpenChanMsg.ChainHash[:],
PendingChanId: req.OpenChanMsg.PendingChannelID[:],
FundingAmt: uint64(req.OpenChanMsg.FundingAmount),
PushAmt: uint64(req.OpenChanMsg.PushAmount),
DustLimit: uint64(req.OpenChanMsg.DustLimit),
MaxValueInFlight: uint64(req.OpenChanMsg.MaxValueInFlight),
ChannelReserve: uint64(req.OpenChanMsg.ChannelReserve),
MinHtlc: uint64(req.OpenChanMsg.HtlcMinimum),
FeePerKw: uint64(req.OpenChanMsg.FeePerKiloWeight),
CsvDelay: uint32(req.OpenChanMsg.CsvDelay),
MaxAcceptedHtlcs: uint32(req.OpenChanMsg.MaxAcceptedHTLCs),
ChannelFlags: uint32(req.OpenChanMsg.ChannelFlags),
}
if err := stream.Send(chanAcceptReq); err != nil {
return err
}
case resp := <-responses:
var pendingID [32]byte
copy(pendingID[:], resp.PendingChanId)
respChan, ok := acceptRequests[pendingID]
if !ok {
continue
}
respChan <- resp.Accept
delete(acceptRequests, pendingID)
case err := <-errChan:
rpcsLog.Errorf("Received an error: %v, shutting down", err)
return err
case <-r.quit:
return fmt.Errorf("RPC server is shutting down")
}
}
}
func (r *rpcServer) BakeMacaroon(ctx context.Context,
req *lnrpc.BakeMacaroonRequest) (*lnrpc.BakeMacaroonResponse, error) {
rpcsLog.Debugf("[bakemacaroon]")
if r.macService == nil {
return nil, fmt.Errorf("macaroon authentication disabled, " +
"remove --no-macaroons flag to enable")
}
helpMsg := fmt.Sprintf("supported actions are %v, supported entities "+
"are %v", validActions, validEntities)
if len(req.Permissions) == 0 {
return nil, fmt.Errorf("permission list cannot be empty. "+
"specify at least one action/entity pair. %s", helpMsg)
}
requestedPermissions := make([]bakery.Op, len(req.Permissions))
for idx, op := range req.Permissions {
if !stringInSlice(op.Action, validActions) {
return nil, fmt.Errorf("invalid permission action. %s",
helpMsg)
}
if !stringInSlice(op.Entity, validEntities) {
return nil, fmt.Errorf("invalid permission entity. %s",
helpMsg)
}
requestedPermissions[idx] = bakery.Op{
Entity: op.Entity,
Action: op.Action,
}
}
newMac, err := r.macService.Oven.NewMacaroon(
ctx, bakery.LatestVersion, nil, requestedPermissions...,
)
if err != nil {
return nil, err
}
newMacBytes, err := newMac.M().MarshalBinary()
if err != nil {
return nil, err
}
resp := &lnrpc.BakeMacaroonResponse{}
resp.Macaroon = hex.EncodeToString(newMacBytes)
return resp, nil
}
func (r *rpcServer) FundingStateStep(ctx context.Context,
in *lnrpc.FundingTransitionMsg) (*lnrpc.FundingStateStepResp, error) {
var pendingChanID [32]byte
switch {
case in.GetShimRegister() != nil &&
in.GetShimRegister().GetChanPointShim() != nil:
rpcShimIntent := in.GetShimRegister().GetChanPointShim()
shimAssembler, err := newFundingShimAssembler(
rpcShimIntent, false, r.server.cc.keyRing,
)
if err != nil {
return nil, err
}
req := &chanfunding.Request{
RemoteAmt: btcutil.Amount(rpcShimIntent.Amt),
}
shimIntent, err := shimAssembler.ProvisionChannel(req)
if err != nil {
return nil, err
}
copy(pendingChanID[:], rpcShimIntent.PendingChanId)
err = r.server.cc.wallet.RegisterFundingIntent(
pendingChanID, shimIntent,
)
if err != nil {
return nil, err
}
case in.GetShimRegister() != nil &&
in.GetShimRegister().GetPsbtShim() != nil:
return nil, fmt.Errorf("PSBT shim must only be sent when " +
"opening a channel")
case in.GetShimCancel() != nil:
rpcsLog.Debugf("Canceling funding shim for pending_id=%x",
in.GetShimCancel().PendingChanId)
copy(pendingChanID[:], in.GetShimCancel().PendingChanId)
err := r.server.cc.wallet.CancelFundingIntent(pendingChanID)
if err != nil {
return nil, err
}
case in.GetPsbtVerify() != nil:
rpcsLog.Debugf("Verifying PSBT for pending_id=%x",
in.GetPsbtVerify().PendingChanId)
copy(pendingChanID[:], in.GetPsbtVerify().PendingChanId)
packet, err := psbt.NewFromRawBytes(
bytes.NewReader(in.GetPsbtVerify().FundedPsbt), false,
)
if err != nil {
return nil, fmt.Errorf("error parsing psbt: %v", err)
}
err = r.server.cc.wallet.PsbtFundingVerify(
pendingChanID, packet,
)
if err != nil {
return nil, err
}
case in.GetPsbtFinalize() != nil:
rpcsLog.Debugf("Finalizing PSBT for pending_id=%x",
in.GetPsbtFinalize().PendingChanId)
copy(pendingChanID[:], in.GetPsbtFinalize().PendingChanId)
packet, err := psbt.NewFromRawBytes(
bytes.NewReader(in.GetPsbtFinalize().SignedPsbt), false,
)
if err != nil {
return nil, fmt.Errorf("error parsing psbt: %v", err)
}
err = r.server.cc.wallet.PsbtFundingFinalize(
pendingChanID, packet,
)
if err != nil {
return nil, err
}
}
return &lnrpc.FundingStateStepResp{}, nil
}