1use {
2 crate::consensus::{
3 tower1_14_11::Tower1_14_11, tower1_7_14::SavedTower1_7_14, Result, Tower, TowerError,
4 TowerVersions,
5 },
6 solana_pubkey::Pubkey,
7 solana_signature::Signature,
8 solana_signer::Signer,
9 std::{
10 fs::{self, File},
11 io::{self, BufReader},
12 path::PathBuf,
13 },
14};
15
16#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
17#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
18pub enum SavedTowerVersions {
19 V1_17_14(SavedTower1_7_14),
20 Current(SavedTower),
21}
22
23impl SavedTowerVersions {
24 fn try_into_tower(&self, node_pubkey: &Pubkey) -> Result<Tower> {
25 assert_eq!(self.pubkey(), Pubkey::default());
27
28 let tv = match self {
29 SavedTowerVersions::V1_17_14(t) => {
30 if !t.signature.verify(node_pubkey.as_ref(), &t.data) {
31 return Err(TowerError::InvalidSignature);
32 }
33 bincode::deserialize(&t.data).map(TowerVersions::V1_7_14)
34 }
35 SavedTowerVersions::Current(t) => {
36 if !t.signature.verify(node_pubkey.as_ref(), &t.data) {
37 return Err(TowerError::InvalidSignature);
38 }
39 bincode::deserialize(&t.data).map(TowerVersions::V1_14_11)
40 }
41 };
42 tv.map_err(|e| e.into()).and_then(|tv: TowerVersions| {
43 let tower = tv.convert_to_current();
44 if tower.node_pubkey != *node_pubkey {
45 return Err(TowerError::WrongTower(format!(
46 "node_pubkey is {:?} but found tower for {:?}",
47 node_pubkey, tower.node_pubkey
48 )));
49 }
50 Ok(tower)
51 })
52 }
53
54 fn serialize_into(&self, file: &mut File) -> Result<()> {
55 bincode::serialize_into(file, self).map_err(|e| e.into())
56 }
57
58 fn pubkey(&self) -> Pubkey {
59 match self {
60 SavedTowerVersions::V1_17_14(t) => t.node_pubkey,
61 SavedTowerVersions::Current(t) => t.node_pubkey,
62 }
63 }
64}
65
66impl From<SavedTower> for SavedTowerVersions {
67 fn from(tower: SavedTower) -> SavedTowerVersions {
68 SavedTowerVersions::Current(tower)
69 }
70}
71
72impl From<SavedTower1_7_14> for SavedTowerVersions {
73 fn from(tower: SavedTower1_7_14) -> SavedTowerVersions {
74 SavedTowerVersions::V1_17_14(tower)
75 }
76}
77
78#[cfg_attr(
79 feature = "frozen-abi",
80 derive(AbiExample),
81 frozen_abi(digest = "GqJW8vVvSkSZwTJE6x6MFFhi7kcU6mqst8PF7493h2hk")
82)]
83#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
84pub struct SavedTower {
85 signature: Signature,
86 #[serde(with = "serde_bytes")]
87 data: Vec<u8>,
88 #[serde(skip)]
89 node_pubkey: Pubkey,
90}
91
92impl SavedTower {
93 pub fn new<T: Signer>(tower: &Tower, keypair: &T) -> Result<Self> {
94 let node_pubkey = keypair.pubkey();
95 if tower.node_pubkey != node_pubkey {
96 return Err(TowerError::WrongTower(format!(
97 "node_pubkey is {:?} but found tower for {:?}",
98 node_pubkey, tower.node_pubkey
99 )));
100 }
101
102 let tower: Tower1_14_11 = tower.clone().into();
104
105 let data = bincode::serialize(&tower)?;
106 let signature = keypair.sign_message(&data);
107 Ok(Self {
108 signature,
109 data,
110 node_pubkey,
111 })
112 }
113}
114
115pub trait TowerStorage: Sync + Send {
116 fn load(&self, node_pubkey: &Pubkey) -> Result<Tower>;
117 fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()>;
118}
119
120#[derive(Debug, Default, Clone, PartialEq, Eq)]
121pub struct NullTowerStorage {}
122
123impl TowerStorage for NullTowerStorage {
124 fn load(&self, _node_pubkey: &Pubkey) -> Result<Tower> {
125 Err(TowerError::IoError(io::Error::other(
126 "NullTowerStorage::load() not available",
127 )))
128 }
129
130 fn store(&self, _saved_tower: &SavedTowerVersions) -> Result<()> {
131 Ok(())
132 }
133}
134
135#[derive(Debug, Default, Clone, PartialEq, Eq)]
136pub struct FileTowerStorage {
137 pub tower_path: PathBuf,
138}
139
140impl FileTowerStorage {
141 pub fn new(tower_path: PathBuf) -> Self {
142 Self { tower_path }
143 }
144
145 pub fn old_filename(&self, node_pubkey: &Pubkey) -> PathBuf {
147 self.tower_path
148 .join(format!("tower-{node_pubkey}"))
149 .with_extension("bin")
150 }
151
152 pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
153 self.tower_path
154 .join(format!("tower-1_9-{node_pubkey}"))
155 .with_extension("bin")
156 }
157
158 #[cfg(test)]
159 fn store_old(&self, saved_tower: &SavedTower1_7_14) -> Result<()> {
160 let pubkey = saved_tower.node_pubkey;
161 let filename = self.old_filename(&pubkey);
162 trace!("store: {}", filename.display());
163 let new_filename = filename.with_extension("bin.new");
164
165 {
166 let file = File::create(&new_filename)?;
168 bincode::serialize_into(file, saved_tower)?;
169 }
171 fs::rename(&new_filename, &filename)?;
172 Ok(())
174 }
175}
176
177impl TowerStorage for FileTowerStorage {
178 fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
179 let filename = self.filename(node_pubkey);
180 trace!("load {}", filename.display());
181
182 fs::create_dir_all(filename.parent().unwrap())?;
184
185 if let Ok(file) = File::open(&filename) {
186 let mut stream = BufReader::new(file);
188
189 bincode::deserialize_from(&mut stream)
190 .map_err(|e| e.into())
191 .and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey))
192 } else {
193 let file = File::open(self.old_filename(node_pubkey))?;
195 let mut stream = BufReader::new(file);
196 bincode::deserialize_from(&mut stream)
197 .map_err(|e| e.into())
198 .and_then(|t: SavedTower1_7_14| {
199 SavedTowerVersions::from(t).try_into_tower(node_pubkey)
200 })
201 }
202 }
203
204 fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
205 let pubkey = saved_tower.pubkey();
206 let filename = self.filename(&pubkey);
207 trace!("store: {}", filename.display());
208 let new_filename = filename.with_extension("bin.new");
209
210 {
211 let mut file = File::create(&new_filename)?;
213 saved_tower.serialize_into(&mut file)?;
214 }
216 fs::rename(&new_filename, &filename)?;
217 Ok(())
219 }
220}
221
222pub struct EtcdTowerStorage {
223 client: tokio::sync::Mutex<etcd_client::Client>,
224 instance_id: [u8; 8],
225 runtime: tokio::runtime::Runtime,
226}
227
228pub struct EtcdTlsConfig {
229 pub domain_name: String,
230 pub ca_certificate: Vec<u8>,
231 pub identity_certificate: Vec<u8>,
232 pub identity_private_key: Vec<u8>,
233}
234
235impl EtcdTowerStorage {
236 pub fn new<E: AsRef<str>, S: AsRef<[E]>>(
237 endpoints: S,
238 tls_config: Option<EtcdTlsConfig>,
239 ) -> Result<Self> {
240 let runtime = tokio::runtime::Builder::new_current_thread()
241 .enable_io()
242 .enable_time()
243 .build()
244 .unwrap();
245
246 let client = runtime
247 .block_on(etcd_client::Client::connect(
248 endpoints,
249 tls_config.map(|tls_config| {
250 etcd_client::ConnectOptions::default().with_tls(
251 etcd_client::TlsOptions::new()
252 .domain_name(tls_config.domain_name)
253 .ca_certificate(etcd_client::Certificate::from_pem(
254 tls_config.ca_certificate,
255 ))
256 .identity(etcd_client::Identity::from_pem(
257 tls_config.identity_certificate,
258 tls_config.identity_private_key,
259 )),
260 )
261 }),
262 ))
263 .map_err(Self::etdc_to_tower_error)?;
264
265 Ok(Self {
266 client: tokio::sync::Mutex::new(client),
267 instance_id: solana_time_utils::timestamp().to_le_bytes(),
268 runtime,
269 })
270 }
271
272 fn get_keys(node_pubkey: &Pubkey) -> (String, String) {
273 let instance_key = format!("{node_pubkey}/instance");
274 let tower_key = format!("{node_pubkey}/tower");
275 (instance_key, tower_key)
276 }
277
278 fn etdc_to_tower_error(error: etcd_client::Error) -> TowerError {
279 TowerError::IoError(io::Error::other(error.to_string()))
280 }
281}
282
283impl TowerStorage for EtcdTowerStorage {
284 fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
285 let (instance_key, tower_key) = Self::get_keys(node_pubkey);
286
287 let txn = etcd_client::Txn::new().and_then(vec![etcd_client::TxnOp::put(
288 instance_key.clone(),
289 self.instance_id,
290 None,
291 )]);
292 self.runtime
293 .block_on(async { self.client.lock().await.txn(txn).await })
294 .map_err(|err| {
295 error!("Failed to acquire etcd instance lock: {err}");
296 Self::etdc_to_tower_error(err)
297 })?;
298
299 let txn = etcd_client::Txn::new()
300 .when(vec![etcd_client::Compare::value(
301 instance_key,
302 etcd_client::CompareOp::Equal,
303 self.instance_id,
304 )])
305 .and_then(vec![etcd_client::TxnOp::get(tower_key, None)]);
306
307 let response = self
308 .runtime
309 .block_on(async { self.client.lock().await.txn(txn).await })
310 .map_err(|err| {
311 error!("Failed to read etcd saved tower: {err}");
312 Self::etdc_to_tower_error(err)
313 })?;
314
315 if !response.succeeded() {
316 return Err(TowerError::IoError(io::Error::other(format!(
317 "Lost etcd instance lock for {node_pubkey}"
318 ))));
319 }
320
321 for op_response in response.op_responses() {
322 if let etcd_client::TxnOpResponse::Get(get_response) = op_response {
323 if let Some(kv) = get_response.kvs().first() {
324 return bincode::deserialize_from(kv.value())
325 .map_err(|e| e.into())
326 .and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey));
327 }
328 }
329 }
330
331 Err(TowerError::IoError(io::Error::other(
333 "Saved tower response missing".to_string(),
334 )))
335 }
336
337 fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
338 let (instance_key, tower_key) = Self::get_keys(&saved_tower.pubkey());
339
340 let txn = etcd_client::Txn::new()
341 .when(vec![etcd_client::Compare::value(
342 instance_key,
343 etcd_client::CompareOp::Equal,
344 self.instance_id,
345 )])
346 .and_then(vec![etcd_client::TxnOp::put(
347 tower_key,
348 bincode::serialize(&saved_tower)?,
349 None,
350 )]);
351
352 let response = self
353 .runtime
354 .block_on(async { self.client.lock().await.txn(txn).await })
355 .map_err(|err| {
356 error!("Failed to write etcd saved tower: {err}");
357 err
358 })
359 .map_err(Self::etdc_to_tower_error)?;
360
361 if !response.succeeded() {
362 return Err(TowerError::IoError(io::Error::other(format!(
363 "Lost etcd instance lock for {}",
364 saved_tower.pubkey()
365 ))));
366 }
367 Ok(())
368 }
369}
370
371#[cfg(test)]
372pub mod test {
373 use {
374 super::*,
375 crate::consensus::{
376 tower1_7_14::{SavedTower1_7_14, Tower1_7_14},
377 BlockhashStatus, Tower,
378 },
379 solana_hash::Hash,
380 solana_keypair::Keypair,
381 solana_vote::vote_transaction::VoteTransaction,
382 solana_vote_program::vote_state::{
383 BlockTimestamp, LandedVote, Vote, VoteState1_14_11, VoteStateV3, MAX_LOCKOUT_HISTORY,
384 },
385 tempfile::TempDir,
386 };
387
388 #[test]
389 fn test_tower_migration() {
390 let tower_path = TempDir::new().unwrap();
391 let identity_keypair = Keypair::new();
392 let node_pubkey = identity_keypair.pubkey();
393 let mut vote_state = VoteStateV3::default();
394 vote_state
395 .votes
396 .resize(MAX_LOCKOUT_HISTORY, LandedVote::default());
397 vote_state.root_slot = Some(1);
398
399 let vote = Vote::new(vec![1, 2, 3, 4], Hash::default());
400 let tower_storage = FileTowerStorage::new(tower_path.path().to_path_buf());
401
402 let old_tower = Tower1_7_14 {
403 node_pubkey,
404 threshold_depth: 10,
405 threshold_size: 0.9,
406 vote_state: VoteState1_14_11::from(vote_state),
407 last_vote: vote.clone(),
408 last_timestamp: BlockTimestamp::default(),
409 last_vote_tx_blockhash: BlockhashStatus::Uninitialized,
410 stray_restored_slot: Some(2),
411 last_switch_threshold_check: Option::default(),
412 };
413
414 {
415 let saved_tower = SavedTower1_7_14::new(&old_tower, &identity_keypair).unwrap();
416 tower_storage.store_old(&saved_tower).unwrap();
417 }
418
419 let loaded = Tower::restore(&tower_storage, &node_pubkey).unwrap();
420 assert_eq!(loaded.node_pubkey, old_tower.node_pubkey);
421 assert_eq!(loaded.last_vote(), VoteTransaction::from(vote));
422 assert_eq!(loaded.vote_state.root_slot, Some(1));
423 assert_eq!(loaded.stray_restored_slot(), None);
424 }
425}