solana_core/consensus/
tower_storage.rs

1use {
2    crate::consensus::{
3        tower1_14_11::Tower1_14_11, tower1_7_14::SavedTower1_7_14, Result, Tower, TowerError,
4        TowerVersions,
5    },
6    solana_pubkey::Pubkey,
7    solana_signature::Signature,
8    solana_signer::Signer,
9    std::{
10        fs::{self, File},
11        io::{self, BufReader},
12        path::PathBuf,
13    },
14};
15
16#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
17#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
18pub enum SavedTowerVersions {
19    V1_17_14(SavedTower1_7_14),
20    Current(SavedTower),
21}
22
23impl SavedTowerVersions {
24    fn try_into_tower(&self, node_pubkey: &Pubkey) -> Result<Tower> {
25        // This method assumes that `self` was just deserialized
26        assert_eq!(self.pubkey(), Pubkey::default());
27
28        let tv = match self {
29            SavedTowerVersions::V1_17_14(t) => {
30                if !t.signature.verify(node_pubkey.as_ref(), &t.data) {
31                    return Err(TowerError::InvalidSignature);
32                }
33                bincode::deserialize(&t.data).map(TowerVersions::V1_7_14)
34            }
35            SavedTowerVersions::Current(t) => {
36                if !t.signature.verify(node_pubkey.as_ref(), &t.data) {
37                    return Err(TowerError::InvalidSignature);
38                }
39                bincode::deserialize(&t.data).map(TowerVersions::V1_14_11)
40            }
41        };
42        tv.map_err(|e| e.into()).and_then(|tv: TowerVersions| {
43            let tower = tv.convert_to_current();
44            if tower.node_pubkey != *node_pubkey {
45                return Err(TowerError::WrongTower(format!(
46                    "node_pubkey is {:?} but found tower for {:?}",
47                    node_pubkey, tower.node_pubkey
48                )));
49            }
50            Ok(tower)
51        })
52    }
53
54    fn serialize_into(&self, file: &mut File) -> Result<()> {
55        bincode::serialize_into(file, self).map_err(|e| e.into())
56    }
57
58    fn pubkey(&self) -> Pubkey {
59        match self {
60            SavedTowerVersions::V1_17_14(t) => t.node_pubkey,
61            SavedTowerVersions::Current(t) => t.node_pubkey,
62        }
63    }
64}
65
66impl From<SavedTower> for SavedTowerVersions {
67    fn from(tower: SavedTower) -> SavedTowerVersions {
68        SavedTowerVersions::Current(tower)
69    }
70}
71
72impl From<SavedTower1_7_14> for SavedTowerVersions {
73    fn from(tower: SavedTower1_7_14) -> SavedTowerVersions {
74        SavedTowerVersions::V1_17_14(tower)
75    }
76}
77
78#[cfg_attr(
79    feature = "frozen-abi",
80    derive(AbiExample),
81    frozen_abi(digest = "GqJW8vVvSkSZwTJE6x6MFFhi7kcU6mqst8PF7493h2hk")
82)]
83#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
84pub struct SavedTower {
85    signature: Signature,
86    #[serde(with = "serde_bytes")]
87    data: Vec<u8>,
88    #[serde(skip)]
89    node_pubkey: Pubkey,
90}
91
92impl SavedTower {
93    pub fn new<T: Signer>(tower: &Tower, keypair: &T) -> Result<Self> {
94        let node_pubkey = keypair.pubkey();
95        if tower.node_pubkey != node_pubkey {
96            return Err(TowerError::WrongTower(format!(
97                "node_pubkey is {:?} but found tower for {:?}",
98                node_pubkey, tower.node_pubkey
99            )));
100        }
101
102        // SavedTower always stores its data in 1_14_11 format
103        let tower: Tower1_14_11 = tower.clone().into();
104
105        let data = bincode::serialize(&tower)?;
106        let signature = keypair.sign_message(&data);
107        Ok(Self {
108            signature,
109            data,
110            node_pubkey,
111        })
112    }
113}
114
115pub trait TowerStorage: Sync + Send {
116    fn load(&self, node_pubkey: &Pubkey) -> Result<Tower>;
117    fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()>;
118}
119
120#[derive(Debug, Default, Clone, PartialEq, Eq)]
121pub struct NullTowerStorage {}
122
123impl TowerStorage for NullTowerStorage {
124    fn load(&self, _node_pubkey: &Pubkey) -> Result<Tower> {
125        Err(TowerError::IoError(io::Error::other(
126            "NullTowerStorage::load() not available",
127        )))
128    }
129
130    fn store(&self, _saved_tower: &SavedTowerVersions) -> Result<()> {
131        Ok(())
132    }
133}
134
135#[derive(Debug, Default, Clone, PartialEq, Eq)]
136pub struct FileTowerStorage {
137    pub tower_path: PathBuf,
138}
139
140impl FileTowerStorage {
141    pub fn new(tower_path: PathBuf) -> Self {
142        Self { tower_path }
143    }
144
145    // Old filename for towers pre 1.9 (VoteStateUpdate)
146    pub fn old_filename(&self, node_pubkey: &Pubkey) -> PathBuf {
147        self.tower_path
148            .join(format!("tower-{node_pubkey}"))
149            .with_extension("bin")
150    }
151
152    pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
153        self.tower_path
154            .join(format!("tower-1_9-{node_pubkey}"))
155            .with_extension("bin")
156    }
157
158    #[cfg(test)]
159    fn store_old(&self, saved_tower: &SavedTower1_7_14) -> Result<()> {
160        let pubkey = saved_tower.node_pubkey;
161        let filename = self.old_filename(&pubkey);
162        trace!("store: {}", filename.display());
163        let new_filename = filename.with_extension("bin.new");
164
165        {
166            // overwrite anything if exists
167            let file = File::create(&new_filename)?;
168            bincode::serialize_into(file, saved_tower)?;
169            // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
170        }
171        fs::rename(&new_filename, &filename)?;
172        // self.path.parent().sync_all() hurts performance same as the above sync
173        Ok(())
174    }
175}
176
177impl TowerStorage for FileTowerStorage {
178    fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
179        let filename = self.filename(node_pubkey);
180        trace!("load {}", filename.display());
181
182        // Ensure to create parent dir here, because restore() precedes save() always
183        fs::create_dir_all(filename.parent().unwrap())?;
184
185        if let Ok(file) = File::open(&filename) {
186            // New format
187            let mut stream = BufReader::new(file);
188
189            bincode::deserialize_from(&mut stream)
190                .map_err(|e| e.into())
191                .and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey))
192        } else {
193            // Old format
194            let file = File::open(self.old_filename(node_pubkey))?;
195            let mut stream = BufReader::new(file);
196            bincode::deserialize_from(&mut stream)
197                .map_err(|e| e.into())
198                .and_then(|t: SavedTower1_7_14| {
199                    SavedTowerVersions::from(t).try_into_tower(node_pubkey)
200                })
201        }
202    }
203
204    fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
205        let pubkey = saved_tower.pubkey();
206        let filename = self.filename(&pubkey);
207        trace!("store: {}", filename.display());
208        let new_filename = filename.with_extension("bin.new");
209
210        {
211            // overwrite anything if exists
212            let mut file = File::create(&new_filename)?;
213            saved_tower.serialize_into(&mut file)?;
214            // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
215        }
216        fs::rename(&new_filename, &filename)?;
217        // self.path.parent().sync_all() hurts performance same as the above sync
218        Ok(())
219    }
220}
221
222pub struct EtcdTowerStorage {
223    client: tokio::sync::Mutex<etcd_client::Client>,
224    instance_id: [u8; 8],
225    runtime: tokio::runtime::Runtime,
226}
227
228pub struct EtcdTlsConfig {
229    pub domain_name: String,
230    pub ca_certificate: Vec<u8>,
231    pub identity_certificate: Vec<u8>,
232    pub identity_private_key: Vec<u8>,
233}
234
235impl EtcdTowerStorage {
236    pub fn new<E: AsRef<str>, S: AsRef<[E]>>(
237        endpoints: S,
238        tls_config: Option<EtcdTlsConfig>,
239    ) -> Result<Self> {
240        let runtime = tokio::runtime::Builder::new_current_thread()
241            .enable_io()
242            .enable_time()
243            .build()
244            .unwrap();
245
246        let client = runtime
247            .block_on(etcd_client::Client::connect(
248                endpoints,
249                tls_config.map(|tls_config| {
250                    etcd_client::ConnectOptions::default().with_tls(
251                        etcd_client::TlsOptions::new()
252                            .domain_name(tls_config.domain_name)
253                            .ca_certificate(etcd_client::Certificate::from_pem(
254                                tls_config.ca_certificate,
255                            ))
256                            .identity(etcd_client::Identity::from_pem(
257                                tls_config.identity_certificate,
258                                tls_config.identity_private_key,
259                            )),
260                    )
261                }),
262            ))
263            .map_err(Self::etdc_to_tower_error)?;
264
265        Ok(Self {
266            client: tokio::sync::Mutex::new(client),
267            instance_id: solana_time_utils::timestamp().to_le_bytes(),
268            runtime,
269        })
270    }
271
272    fn get_keys(node_pubkey: &Pubkey) -> (String, String) {
273        let instance_key = format!("{node_pubkey}/instance");
274        let tower_key = format!("{node_pubkey}/tower");
275        (instance_key, tower_key)
276    }
277
278    fn etdc_to_tower_error(error: etcd_client::Error) -> TowerError {
279        TowerError::IoError(io::Error::other(error.to_string()))
280    }
281}
282
283impl TowerStorage for EtcdTowerStorage {
284    fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
285        let (instance_key, tower_key) = Self::get_keys(node_pubkey);
286
287        let txn = etcd_client::Txn::new().and_then(vec![etcd_client::TxnOp::put(
288            instance_key.clone(),
289            self.instance_id,
290            None,
291        )]);
292        self.runtime
293            .block_on(async { self.client.lock().await.txn(txn).await })
294            .map_err(|err| {
295                error!("Failed to acquire etcd instance lock: {err}");
296                Self::etdc_to_tower_error(err)
297            })?;
298
299        let txn = etcd_client::Txn::new()
300            .when(vec![etcd_client::Compare::value(
301                instance_key,
302                etcd_client::CompareOp::Equal,
303                self.instance_id,
304            )])
305            .and_then(vec![etcd_client::TxnOp::get(tower_key, None)]);
306
307        let response = self
308            .runtime
309            .block_on(async { self.client.lock().await.txn(txn).await })
310            .map_err(|err| {
311                error!("Failed to read etcd saved tower: {err}");
312                Self::etdc_to_tower_error(err)
313            })?;
314
315        if !response.succeeded() {
316            return Err(TowerError::IoError(io::Error::other(format!(
317                "Lost etcd instance lock for {node_pubkey}"
318            ))));
319        }
320
321        for op_response in response.op_responses() {
322            if let etcd_client::TxnOpResponse::Get(get_response) = op_response {
323                if let Some(kv) = get_response.kvs().first() {
324                    return bincode::deserialize_from(kv.value())
325                        .map_err(|e| e.into())
326                        .and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey));
327                }
328            }
329        }
330
331        // Should never happen...
332        Err(TowerError::IoError(io::Error::other(
333            "Saved tower response missing".to_string(),
334        )))
335    }
336
337    fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
338        let (instance_key, tower_key) = Self::get_keys(&saved_tower.pubkey());
339
340        let txn = etcd_client::Txn::new()
341            .when(vec![etcd_client::Compare::value(
342                instance_key,
343                etcd_client::CompareOp::Equal,
344                self.instance_id,
345            )])
346            .and_then(vec![etcd_client::TxnOp::put(
347                tower_key,
348                bincode::serialize(&saved_tower)?,
349                None,
350            )]);
351
352        let response = self
353            .runtime
354            .block_on(async { self.client.lock().await.txn(txn).await })
355            .map_err(|err| {
356                error!("Failed to write etcd saved tower: {err}");
357                err
358            })
359            .map_err(Self::etdc_to_tower_error)?;
360
361        if !response.succeeded() {
362            return Err(TowerError::IoError(io::Error::other(format!(
363                "Lost etcd instance lock for {}",
364                saved_tower.pubkey()
365            ))));
366        }
367        Ok(())
368    }
369}
370
371#[cfg(test)]
372pub mod test {
373    use {
374        super::*,
375        crate::consensus::{
376            tower1_7_14::{SavedTower1_7_14, Tower1_7_14},
377            BlockhashStatus, Tower,
378        },
379        solana_hash::Hash,
380        solana_keypair::Keypair,
381        solana_vote::vote_transaction::VoteTransaction,
382        solana_vote_program::vote_state::{
383            BlockTimestamp, LandedVote, Vote, VoteState1_14_11, VoteStateV3, MAX_LOCKOUT_HISTORY,
384        },
385        tempfile::TempDir,
386    };
387
388    #[test]
389    fn test_tower_migration() {
390        let tower_path = TempDir::new().unwrap();
391        let identity_keypair = Keypair::new();
392        let node_pubkey = identity_keypair.pubkey();
393        let mut vote_state = VoteStateV3::default();
394        vote_state
395            .votes
396            .resize(MAX_LOCKOUT_HISTORY, LandedVote::default());
397        vote_state.root_slot = Some(1);
398
399        let vote = Vote::new(vec![1, 2, 3, 4], Hash::default());
400        let tower_storage = FileTowerStorage::new(tower_path.path().to_path_buf());
401
402        let old_tower = Tower1_7_14 {
403            node_pubkey,
404            threshold_depth: 10,
405            threshold_size: 0.9,
406            vote_state: VoteState1_14_11::from(vote_state),
407            last_vote: vote.clone(),
408            last_timestamp: BlockTimestamp::default(),
409            last_vote_tx_blockhash: BlockhashStatus::Uninitialized,
410            stray_restored_slot: Some(2),
411            last_switch_threshold_check: Option::default(),
412        };
413
414        {
415            let saved_tower = SavedTower1_7_14::new(&old_tower, &identity_keypair).unwrap();
416            tower_storage.store_old(&saved_tower).unwrap();
417        }
418
419        let loaded = Tower::restore(&tower_storage, &node_pubkey).unwrap();
420        assert_eq!(loaded.node_pubkey, old_tower.node_pubkey);
421        assert_eq!(loaded.last_vote(), VoteTransaction::from(vote));
422        assert_eq!(loaded.vote_state.root_slot, Some(1));
423        assert_eq!(loaded.stray_restored_slot(), None);
424    }
425}