vls_proxy/
portfront.rs

1//! The SignerPortFront and NodePortFront provide a client RPC interface to the
2//! core MultiSigner and Node objects via a communications link.
3
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use async_trait::async_trait;
8
9use bitcoin::bip32::ExtendedPubKey;
10use bitcoin::blockdata::block::Header as BlockHeader;
11use bitcoin::consensus::serialize;
12use bitcoin::secp256k1::PublicKey;
13use bitcoin::{BlockHash, Network, OutPoint, Txid};
14use lightning_signer::bitcoin;
15
16use vls_frontend::{ChainTrack, ChainTrackDirectory};
17use vls_protocol::msgs::{self, DebugTxoProof, Message, SerBolt};
18use vls_protocol::serde_bolt::{self, LargeOctets, Octets};
19use vls_protocol_client::SignerPort;
20
21use lightning_signer::chain::tracker::Headers;
22use lightning_signer::node::SignedHeartbeat;
23use lightning_signer::txoo::proof::TxoProof;
24use log::*;
25
26/// Implements ChainTrackDirectory using RPC to remote MultiSigner
27pub struct SignerPortFront {
28    pub signer_port: Arc<dyn SignerPort>,
29    pub network: Network,
30    pub trackers: Vec<Arc<dyn ChainTrack>>,
31}
32
33impl SignerPortFront {
34    pub fn new(signer_port: Arc<dyn SignerPort>, network: Network) -> Self {
35        let front = NodePortFront::new(signer_port.clone(), network);
36        let trackers = vec![Arc::new(front) as Arc<dyn ChainTrack>];
37        SignerPortFront { signer_port, network, trackers }
38    }
39}
40
41#[async_trait]
42impl ChainTrackDirectory for SignerPortFront {
43    fn tracker(&self, _node_id: &PublicKey) -> Arc<dyn ChainTrack> {
44        unimplemented!()
45    }
46
47    async fn trackers(&self) -> Vec<Arc<dyn ChainTrack>> {
48        self.trackers.clone()
49    }
50}
51
52#[derive(Clone)]
53struct NodeKeys {
54    node_id: PublicKey,
55    heartbeat_pubkey: PublicKey,
56}
57
58/// Implements ChainTrack using RPC to remote node
59pub(crate) struct NodePortFront {
60    signer_port: Arc<dyn SignerPort>,
61    network: Network,
62    node_keys: Mutex<Option<NodeKeys>>,
63    block_chunk_size: usize,
64}
65
66const LOG_INTERVAL: u64 = 100;
67
68// blocks will be streamed in chunks of this size
69const BLOCK_CHUNK_SIZE: usize = 8192;
70
71impl NodePortFront {
72    fn new(signer_port: Arc<dyn SignerPort>, network: Network) -> Self {
73        debug!("NodePortFront::new network: {}", network);
74        let test_streaming = std::env::var("VLS_CHAINFOLLOWER_TEST_STREAMING")
75            .map(|s| s == "1" || s == "true")
76            .unwrap_or(false);
77        let block_chunk_size = if test_streaming {
78            // create more chunks for better system testing
79            1223
80        } else {
81            BLOCK_CHUNK_SIZE
82        };
83
84        Self { signer_port, network, node_keys: Mutex::new(None), block_chunk_size }
85    }
86
87    async fn wait_ready(&self) {
88        let mut attempt: u64 = 0;
89        while !self.signer_port.is_ready() {
90            if attempt % LOG_INTERVAL == 0 {
91                info!("waiting for signer_port to be ready");
92            }
93            attempt += 1;
94            tokio::time::sleep(Duration::from_millis(100)).await;
95        }
96        if attempt > 0 {
97            info!("signer_port is now ready");
98        }
99    }
100
101    async fn populate_keys(&self) -> NodeKeys {
102        self.wait_ready().await;
103        let reply = self
104            .signer_port
105            .handle_message(msgs::NodeInfo {}.as_vec())
106            .await
107            .expect("NodeInfo failed");
108        if let Ok(Message::NodeInfoReply(m)) = msgs::from_vec(reply) {
109            let xpubkey = ExtendedPubKey::decode(&m.bip32.0).expect("NodeInfoReply bip32 xpubkey");
110            let heartbeat_pubkey = xpubkey.public_key;
111            let node_id = PublicKey::from_slice(&m.node_id.0).expect("NodeInfoReply node_id");
112            let mut lock = self.node_keys.lock().unwrap();
113            let keys = NodeKeys { node_id, heartbeat_pubkey };
114            *lock = Some(keys.clone());
115            return keys;
116        } else {
117            panic!("unexpected NodeInfoReply");
118        }
119    }
120
121    async fn maybe_stream_block(&self, proof: TxoProof) -> TxoProof {
122        // stream the block to the signer, if this is a false positive
123        let (proof, block_opt) = proof.take_block();
124        if let Some(block) = block_opt {
125            let block_hash = block.block_hash();
126            let bytes = serialize(&block);
127            let mut offset = 0;
128            for chunk in bytes.chunks(self.block_chunk_size) {
129                let req =
130                    msgs::BlockChunk { hash: block_hash, offset, content: Octets(chunk.to_vec()) };
131                let reply_bytes =
132                    self.signer_port.handle_message(req.as_vec()).await.expect("BlockChunk failed");
133                let result = msgs::from_vec(reply_bytes);
134                match result {
135                    Ok(Message::BlockChunkReply(_)) => {}
136                    _ => panic!("unexpected {:?} when looking for BlockChunkReply", result),
137                }
138                offset += chunk.len() as u32;
139            }
140        }
141        proof
142    }
143}
144
145#[async_trait]
146impl ChainTrack for NodePortFront {
147    fn log_prefix(&self) -> String {
148        let lock = self.node_keys.lock().unwrap();
149        if let Some(nk) = lock.as_ref() {
150            let id = nk.node_id.serialize().to_vec();
151            format!("tracker {}", hex::encode(&id[0..4]))
152        } else {
153            format!("tracker")
154        }
155    }
156
157    async fn id(&self) -> Vec<u8> {
158        {
159            let lock = self.node_keys.lock().unwrap();
160            if let Some(nk) = lock.as_ref() {
161                return nk.node_id.serialize().to_vec();
162            }
163        }
164        let keys = self.populate_keys().await;
165        let idvec = keys.node_id.serialize().to_vec();
166        debug!("NodePortFront::id {}", hex::encode(&idvec));
167        idvec
168    }
169
170    async fn heartbeat_pubkey(&self) -> PublicKey {
171        {
172            let lock = self.node_keys.lock().unwrap();
173            if let Some(nk) = lock.as_ref() {
174                return nk.heartbeat_pubkey.clone();
175            }
176        }
177        let keys = self.populate_keys().await;
178        let pubkey = keys.heartbeat_pubkey;
179        debug!("NodePortFront::heartbeat_pubkey {}", pubkey);
180        pubkey
181    }
182
183    fn network(&self) -> Network {
184        self.network
185    }
186
187    async fn tip_info(&self) -> (u32, BlockHash) {
188        self.wait_ready().await;
189        let req = msgs::TipInfo {};
190        let reply = self.signer_port.handle_message(req.as_vec()).await.expect("TipInfo failed");
191        if let Ok(Message::TipInfoReply(m)) = msgs::from_vec(reply) {
192            (m.height, m.block_hash)
193        } else {
194            panic!("unexpected TipInfoReply");
195        }
196    }
197
198    async fn forward_watches(&self) -> (Vec<Txid>, Vec<OutPoint>) {
199        self.wait_ready().await;
200        let req = msgs::ForwardWatches {};
201        let reply =
202            self.signer_port.handle_message(req.as_vec()).await.expect("ForwardWatches failed");
203        match msgs::from_vec(reply) {
204            Ok(Message::ForwardWatchesReply(m)) => (m.txids.0, m.outpoints.0),
205            Ok(m) => panic!("unexpected {:?}", m),
206            Err(e) => panic!("unexpected error {:?}", e),
207        }
208    }
209
210    async fn reverse_watches(&self) -> (Vec<Txid>, Vec<OutPoint>) {
211        self.wait_ready().await;
212        let req = msgs::ReverseWatches {};
213        let reply =
214            self.signer_port.handle_message(req.as_vec()).await.expect("ReverseWatches failed");
215        match msgs::from_vec(reply) {
216            Ok(Message::ReverseWatchesReply(m)) => (m.txids.0, m.outpoints.0),
217            Ok(m) => panic!("unexpected {:?}", m),
218            Err(e) => panic!("unexpected error {:?}", e),
219        }
220    }
221
222    async fn add_block(&self, header: BlockHeader, proof: TxoProof) {
223        self.wait_ready().await;
224
225        let proof = self.maybe_stream_block(proof).await;
226
227        let req = msgs::AddBlock {
228            header: Octets(serialize(&header)),
229            unspent_proof: Some(DebugTxoProof(proof)),
230        };
231        let reply = self.signer_port.handle_message(req.as_vec()).await.expect("AddBlock failed");
232        match msgs::from_vec(reply) {
233            Ok(Message::AddBlockReply(_)) => return,
234            Ok(Message::SignerError(msgs::SignerError { code, message })) => match code {
235                msgs::CODE_ORPHAN_BLOCK => {
236                    warn!("signer returned an OrphanBlock error: {:?}", message);
237                    return;
238                }
239                _ => panic!("NodePortFront can't handle error code {}", code),
240            },
241            _ => panic!("unexpected AddBlockReply"),
242        }
243    }
244
245    async fn remove_block(&self, proof: TxoProof, prev_headers: Headers) {
246        self.wait_ready().await;
247
248        let proof = self.maybe_stream_block(proof).await;
249
250        let req = msgs::RemoveBlock {
251            unspent_proof: Some(LargeOctets(serialize(&proof))),
252            prev_block_header: prev_headers.0,
253            prev_filter_header: prev_headers.1,
254        };
255        let reply =
256            self.signer_port.handle_message(req.as_vec()).await.expect("RemoveBlock failed");
257        if let Ok(Message::RemoveBlockReply(_)) = msgs::from_vec(reply) {
258            return;
259        } else {
260            panic!("unexpected RemoveBlockReply");
261        }
262    }
263
264    async fn beat(&self) -> SignedHeartbeat {
265        self.wait_ready().await;
266        let req = msgs::GetHeartbeat {};
267        let reply =
268            self.signer_port.handle_message(req.as_vec()).await.expect("GetHeartbeat failed");
269        if let Ok(Message::GetHeartbeatReply(m)) = msgs::from_vec(reply) {
270            let mut ser_hb = m.heartbeat.0;
271            serde_bolt::from_vec(&mut ser_hb).expect("bad heartbeat")
272        } else {
273            panic!("unexpected GetHeartbeatReply");
274        }
275    }
276}