1use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use async_trait::async_trait;
8
9use bitcoin::bip32::ExtendedPubKey;
10use bitcoin::blockdata::block::Header as BlockHeader;
11use bitcoin::consensus::serialize;
12use bitcoin::secp256k1::PublicKey;
13use bitcoin::{BlockHash, Network, OutPoint, Txid};
14use lightning_signer::bitcoin;
15
16use vls_frontend::{ChainTrack, ChainTrackDirectory};
17use vls_protocol::msgs::{self, DebugTxoProof, Message, SerBolt};
18use vls_protocol::serde_bolt::{self, LargeOctets, Octets};
19use vls_protocol_client::SignerPort;
20
21use lightning_signer::chain::tracker::Headers;
22use lightning_signer::node::SignedHeartbeat;
23use lightning_signer::txoo::proof::TxoProof;
24use log::*;
25
26pub struct SignerPortFront {
28 pub signer_port: Arc<dyn SignerPort>,
29 pub network: Network,
30 pub trackers: Vec<Arc<dyn ChainTrack>>,
31}
32
33impl SignerPortFront {
34 pub fn new(signer_port: Arc<dyn SignerPort>, network: Network) -> Self {
35 let front = NodePortFront::new(signer_port.clone(), network);
36 let trackers = vec![Arc::new(front) as Arc<dyn ChainTrack>];
37 SignerPortFront { signer_port, network, trackers }
38 }
39}
40
41#[async_trait]
42impl ChainTrackDirectory for SignerPortFront {
43 fn tracker(&self, _node_id: &PublicKey) -> Arc<dyn ChainTrack> {
44 unimplemented!()
45 }
46
47 async fn trackers(&self) -> Vec<Arc<dyn ChainTrack>> {
48 self.trackers.clone()
49 }
50}
51
52#[derive(Clone)]
53struct NodeKeys {
54 node_id: PublicKey,
55 heartbeat_pubkey: PublicKey,
56}
57
58pub(crate) struct NodePortFront {
60 signer_port: Arc<dyn SignerPort>,
61 network: Network,
62 node_keys: Mutex<Option<NodeKeys>>,
63 block_chunk_size: usize,
64}
65
66const LOG_INTERVAL: u64 = 100;
67
68const BLOCK_CHUNK_SIZE: usize = 8192;
70
71impl NodePortFront {
72 fn new(signer_port: Arc<dyn SignerPort>, network: Network) -> Self {
73 debug!("NodePortFront::new network: {}", network);
74 let test_streaming = std::env::var("VLS_CHAINFOLLOWER_TEST_STREAMING")
75 .map(|s| s == "1" || s == "true")
76 .unwrap_or(false);
77 let block_chunk_size = if test_streaming {
78 1223
80 } else {
81 BLOCK_CHUNK_SIZE
82 };
83
84 Self { signer_port, network, node_keys: Mutex::new(None), block_chunk_size }
85 }
86
87 async fn wait_ready(&self) {
88 let mut attempt: u64 = 0;
89 while !self.signer_port.is_ready() {
90 if attempt % LOG_INTERVAL == 0 {
91 info!("waiting for signer_port to be ready");
92 }
93 attempt += 1;
94 tokio::time::sleep(Duration::from_millis(100)).await;
95 }
96 if attempt > 0 {
97 info!("signer_port is now ready");
98 }
99 }
100
101 async fn populate_keys(&self) -> NodeKeys {
102 self.wait_ready().await;
103 let reply = self
104 .signer_port
105 .handle_message(msgs::NodeInfo {}.as_vec())
106 .await
107 .expect("NodeInfo failed");
108 if let Ok(Message::NodeInfoReply(m)) = msgs::from_vec(reply) {
109 let xpubkey = ExtendedPubKey::decode(&m.bip32.0).expect("NodeInfoReply bip32 xpubkey");
110 let heartbeat_pubkey = xpubkey.public_key;
111 let node_id = PublicKey::from_slice(&m.node_id.0).expect("NodeInfoReply node_id");
112 let mut lock = self.node_keys.lock().unwrap();
113 let keys = NodeKeys { node_id, heartbeat_pubkey };
114 *lock = Some(keys.clone());
115 return keys;
116 } else {
117 panic!("unexpected NodeInfoReply");
118 }
119 }
120
121 async fn maybe_stream_block(&self, proof: TxoProof) -> TxoProof {
122 let (proof, block_opt) = proof.take_block();
124 if let Some(block) = block_opt {
125 let block_hash = block.block_hash();
126 let bytes = serialize(&block);
127 let mut offset = 0;
128 for chunk in bytes.chunks(self.block_chunk_size) {
129 let req =
130 msgs::BlockChunk { hash: block_hash, offset, content: Octets(chunk.to_vec()) };
131 let reply_bytes =
132 self.signer_port.handle_message(req.as_vec()).await.expect("BlockChunk failed");
133 let result = msgs::from_vec(reply_bytes);
134 match result {
135 Ok(Message::BlockChunkReply(_)) => {}
136 _ => panic!("unexpected {:?} when looking for BlockChunkReply", result),
137 }
138 offset += chunk.len() as u32;
139 }
140 }
141 proof
142 }
143}
144
145#[async_trait]
146impl ChainTrack for NodePortFront {
147 fn log_prefix(&self) -> String {
148 let lock = self.node_keys.lock().unwrap();
149 if let Some(nk) = lock.as_ref() {
150 let id = nk.node_id.serialize().to_vec();
151 format!("tracker {}", hex::encode(&id[0..4]))
152 } else {
153 format!("tracker")
154 }
155 }
156
157 async fn id(&self) -> Vec<u8> {
158 {
159 let lock = self.node_keys.lock().unwrap();
160 if let Some(nk) = lock.as_ref() {
161 return nk.node_id.serialize().to_vec();
162 }
163 }
164 let keys = self.populate_keys().await;
165 let idvec = keys.node_id.serialize().to_vec();
166 debug!("NodePortFront::id {}", hex::encode(&idvec));
167 idvec
168 }
169
170 async fn heartbeat_pubkey(&self) -> PublicKey {
171 {
172 let lock = self.node_keys.lock().unwrap();
173 if let Some(nk) = lock.as_ref() {
174 return nk.heartbeat_pubkey.clone();
175 }
176 }
177 let keys = self.populate_keys().await;
178 let pubkey = keys.heartbeat_pubkey;
179 debug!("NodePortFront::heartbeat_pubkey {}", pubkey);
180 pubkey
181 }
182
183 fn network(&self) -> Network {
184 self.network
185 }
186
187 async fn tip_info(&self) -> (u32, BlockHash) {
188 self.wait_ready().await;
189 let req = msgs::TipInfo {};
190 let reply = self.signer_port.handle_message(req.as_vec()).await.expect("TipInfo failed");
191 if let Ok(Message::TipInfoReply(m)) = msgs::from_vec(reply) {
192 (m.height, m.block_hash)
193 } else {
194 panic!("unexpected TipInfoReply");
195 }
196 }
197
198 async fn forward_watches(&self) -> (Vec<Txid>, Vec<OutPoint>) {
199 self.wait_ready().await;
200 let req = msgs::ForwardWatches {};
201 let reply =
202 self.signer_port.handle_message(req.as_vec()).await.expect("ForwardWatches failed");
203 match msgs::from_vec(reply) {
204 Ok(Message::ForwardWatchesReply(m)) => (m.txids.0, m.outpoints.0),
205 Ok(m) => panic!("unexpected {:?}", m),
206 Err(e) => panic!("unexpected error {:?}", e),
207 }
208 }
209
210 async fn reverse_watches(&self) -> (Vec<Txid>, Vec<OutPoint>) {
211 self.wait_ready().await;
212 let req = msgs::ReverseWatches {};
213 let reply =
214 self.signer_port.handle_message(req.as_vec()).await.expect("ReverseWatches failed");
215 match msgs::from_vec(reply) {
216 Ok(Message::ReverseWatchesReply(m)) => (m.txids.0, m.outpoints.0),
217 Ok(m) => panic!("unexpected {:?}", m),
218 Err(e) => panic!("unexpected error {:?}", e),
219 }
220 }
221
222 async fn add_block(&self, header: BlockHeader, proof: TxoProof) {
223 self.wait_ready().await;
224
225 let proof = self.maybe_stream_block(proof).await;
226
227 let req = msgs::AddBlock {
228 header: Octets(serialize(&header)),
229 unspent_proof: Some(DebugTxoProof(proof)),
230 };
231 let reply = self.signer_port.handle_message(req.as_vec()).await.expect("AddBlock failed");
232 match msgs::from_vec(reply) {
233 Ok(Message::AddBlockReply(_)) => return,
234 Ok(Message::SignerError(msgs::SignerError { code, message })) => match code {
235 msgs::CODE_ORPHAN_BLOCK => {
236 warn!("signer returned an OrphanBlock error: {:?}", message);
237 return;
238 }
239 _ => panic!("NodePortFront can't handle error code {}", code),
240 },
241 _ => panic!("unexpected AddBlockReply"),
242 }
243 }
244
245 async fn remove_block(&self, proof: TxoProof, prev_headers: Headers) {
246 self.wait_ready().await;
247
248 let proof = self.maybe_stream_block(proof).await;
249
250 let req = msgs::RemoveBlock {
251 unspent_proof: Some(LargeOctets(serialize(&proof))),
252 prev_block_header: prev_headers.0,
253 prev_filter_header: prev_headers.1,
254 };
255 let reply =
256 self.signer_port.handle_message(req.as_vec()).await.expect("RemoveBlock failed");
257 if let Ok(Message::RemoveBlockReply(_)) = msgs::from_vec(reply) {
258 return;
259 } else {
260 panic!("unexpected RemoveBlockReply");
261 }
262 }
263
264 async fn beat(&self) -> SignedHeartbeat {
265 self.wait_ready().await;
266 let req = msgs::GetHeartbeat {};
267 let reply =
268 self.signer_port.handle_message(req.as_vec()).await.expect("GetHeartbeat failed");
269 if let Ok(Message::GetHeartbeatReply(m)) = msgs::from_vec(reply) {
270 let mut ser_hb = m.heartbeat.0;
271 serde_bolt::from_vec(&mut ser_hb).expect("bad heartbeat")
272 } else {
273 panic!("unexpected GetHeartbeatReply");
274 }
275 }
276}