Skip to main content

talos_api_rs/resources/
etcd.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Typed wrappers for etcd operations.
4//!
5//! This module provides high-level types for interacting with the etcd cluster
6//! running on Talos control plane nodes.
7
8use crate::api::generated::machine::{
9    EtcdAlarm as ProtoEtcdAlarm, EtcdAlarmDisarm as ProtoEtcdAlarmDisarm,
10    EtcdAlarmDisarmResponse as ProtoEtcdAlarmDisarmResponse,
11    EtcdAlarmListResponse as ProtoEtcdAlarmListResponse, EtcdDefragment as ProtoEtcdDefragment,
12    EtcdDefragmentResponse as ProtoEtcdDefragmentResponse,
13    EtcdForfeitLeadership as ProtoEtcdForfeitLeadership,
14    EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest,
15    EtcdForfeitLeadershipResponse as ProtoEtcdForfeitLeadershipResponse,
16    EtcdLeaveCluster as ProtoEtcdLeaveCluster,
17    EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest,
18    EtcdLeaveClusterResponse as ProtoEtcdLeaveClusterResponse, EtcdMember as ProtoEtcdMember,
19    EtcdMemberAlarm as ProtoEtcdMemberAlarm, EtcdMemberListRequest as ProtoEtcdMemberListRequest,
20    EtcdMemberListResponse as ProtoEtcdMemberListResponse,
21    EtcdMemberStatus as ProtoEtcdMemberStatus, EtcdMembers as ProtoEtcdMembers,
22    EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest,
23    EtcdRemoveMemberByIdResponse as ProtoEtcdRemoveMemberByIdResponse,
24    EtcdStatus as ProtoEtcdStatus, EtcdStatusResponse as ProtoEtcdStatusResponse,
25};
26
27// =============================================================================
28// EtcdMemberList
29// =============================================================================
30
31/// Request to list etcd cluster members.
32#[derive(Debug, Clone, Default)]
33pub struct EtcdMemberListRequest {
34    /// If true, query only the local node's view of the cluster.
35    pub query_local: bool,
36}
37
38impl EtcdMemberListRequest {
39    /// Create a new request to list etcd members.
40    #[must_use]
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Query only the local node's view.
46    #[must_use]
47    pub fn local() -> Self {
48        Self { query_local: true }
49    }
50}
51
52impl From<EtcdMemberListRequest> for ProtoEtcdMemberListRequest {
53    fn from(req: EtcdMemberListRequest) -> Self {
54        Self {
55            query_local: req.query_local,
56        }
57    }
58}
59
60/// An etcd cluster member.
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct EtcdMember {
63    /// Member ID (unique identifier).
64    pub id: u64,
65    /// Human-readable hostname.
66    pub hostname: String,
67    /// URLs the member exposes to peers.
68    pub peer_urls: Vec<String>,
69    /// URLs the member exposes to clients.
70    pub client_urls: Vec<String>,
71    /// Whether this member is a learner.
72    pub is_learner: bool,
73}
74
75impl From<ProtoEtcdMember> for EtcdMember {
76    fn from(proto: ProtoEtcdMember) -> Self {
77        Self {
78            id: proto.id,
79            hostname: proto.hostname,
80            peer_urls: proto.peer_urls,
81            client_urls: proto.client_urls,
82            is_learner: proto.is_learner,
83        }
84    }
85}
86
87/// Result from a single node for member list request.
88#[derive(Debug, Clone)]
89pub struct EtcdMembersResult {
90    /// Node that returned this result.
91    pub node: Option<String>,
92    /// List of etcd members.
93    pub members: Vec<EtcdMember>,
94}
95
96impl From<ProtoEtcdMembers> for EtcdMembersResult {
97    fn from(proto: ProtoEtcdMembers) -> Self {
98        Self {
99            node: proto.metadata.map(|m| m.hostname),
100            members: proto.members.into_iter().map(EtcdMember::from).collect(),
101        }
102    }
103}
104
105/// Response from etcd member list request.
106#[derive(Debug, Clone)]
107pub struct EtcdMemberListResponse {
108    /// Results from each node.
109    pub results: Vec<EtcdMembersResult>,
110}
111
112impl From<ProtoEtcdMemberListResponse> for EtcdMemberListResponse {
113    fn from(proto: ProtoEtcdMemberListResponse) -> Self {
114        Self {
115            results: proto
116                .messages
117                .into_iter()
118                .map(EtcdMembersResult::from)
119                .collect(),
120        }
121    }
122}
123
124impl EtcdMemberListResponse {
125    /// Get all unique members across all nodes.
126    #[must_use]
127    pub fn all_members(&self) -> Vec<&EtcdMember> {
128        let mut seen_ids = std::collections::HashSet::new();
129        let mut members = Vec::new();
130
131        for result in &self.results {
132            for member in &result.members {
133                if seen_ids.insert(member.id) {
134                    members.push(member);
135                }
136            }
137        }
138
139        members
140    }
141
142    /// Find a member by hostname.
143    #[must_use]
144    pub fn find_by_hostname(&self, hostname: &str) -> Option<&EtcdMember> {
145        self.all_members()
146            .into_iter()
147            .find(|m| m.hostname == hostname)
148    }
149
150    /// Find a member by ID.
151    #[must_use]
152    pub fn find_by_id(&self, id: u64) -> Option<&EtcdMember> {
153        self.all_members().into_iter().find(|m| m.id == id)
154    }
155}
156
157// =============================================================================
158// EtcdRemoveMemberByID
159// =============================================================================
160
161/// Request to remove an etcd member by ID.
162///
163/// Use this to remove members that no longer have an associated Talos node.
164/// For nodes that are still running, use [`EtcdLeaveClusterRequest`] instead.
165#[derive(Debug, Clone)]
166pub struct EtcdRemoveMemberByIdRequest {
167    /// The member ID to remove.
168    pub member_id: u64,
169}
170
171impl EtcdRemoveMemberByIdRequest {
172    /// Create a new request to remove a member by ID.
173    #[must_use]
174    pub fn new(member_id: u64) -> Self {
175        Self { member_id }
176    }
177}
178
179impl From<EtcdRemoveMemberByIdRequest> for ProtoEtcdRemoveMemberByIdRequest {
180    fn from(req: EtcdRemoveMemberByIdRequest) -> Self {
181        Self {
182            member_id: req.member_id,
183        }
184    }
185}
186
187/// Result from removing a member by ID.
188#[derive(Debug, Clone)]
189pub struct EtcdRemoveMemberByIdResult {
190    /// Node that processed this request.
191    pub node: Option<String>,
192}
193
194/// Response from removing a member by ID.
195#[derive(Debug, Clone)]
196pub struct EtcdRemoveMemberByIdResponse {
197    /// Results from each node.
198    pub results: Vec<EtcdRemoveMemberByIdResult>,
199}
200
201impl From<ProtoEtcdRemoveMemberByIdResponse> for EtcdRemoveMemberByIdResponse {
202    fn from(proto: ProtoEtcdRemoveMemberByIdResponse) -> Self {
203        Self {
204            results: proto
205                .messages
206                .into_iter()
207                .map(|m| EtcdRemoveMemberByIdResult {
208                    node: m.metadata.map(|meta| meta.hostname),
209                })
210                .collect(),
211        }
212    }
213}
214
215impl EtcdRemoveMemberByIdResponse {
216    /// Check if the operation was successful (at least one node responded).
217    #[must_use]
218    pub fn is_success(&self) -> bool {
219        !self.results.is_empty()
220    }
221}
222
223// =============================================================================
224// EtcdLeaveCluster
225// =============================================================================
226
227/// Request for a node to leave the etcd cluster gracefully.
228///
229/// This should be called on the node that is being removed.
230#[derive(Debug, Clone, Default)]
231pub struct EtcdLeaveClusterRequest;
232
233impl EtcdLeaveClusterRequest {
234    /// Create a new request.
235    #[must_use]
236    pub fn new() -> Self {
237        Self
238    }
239}
240
241impl From<EtcdLeaveClusterRequest> for ProtoEtcdLeaveClusterRequest {
242    fn from(_req: EtcdLeaveClusterRequest) -> Self {
243        Self {}
244    }
245}
246
247/// Result from leaving the cluster.
248#[derive(Debug, Clone)]
249pub struct EtcdLeaveClusterResult {
250    /// Node that left the cluster.
251    pub node: Option<String>,
252}
253
254impl From<ProtoEtcdLeaveCluster> for EtcdLeaveClusterResult {
255    fn from(proto: ProtoEtcdLeaveCluster) -> Self {
256        Self {
257            node: proto.metadata.map(|m| m.hostname),
258        }
259    }
260}
261
262/// Response from leaving the cluster.
263#[derive(Debug, Clone)]
264pub struct EtcdLeaveClusterResponse {
265    /// Results from each node.
266    pub results: Vec<EtcdLeaveClusterResult>,
267}
268
269impl From<ProtoEtcdLeaveClusterResponse> for EtcdLeaveClusterResponse {
270    fn from(proto: ProtoEtcdLeaveClusterResponse) -> Self {
271        Self {
272            results: proto
273                .messages
274                .into_iter()
275                .map(EtcdLeaveClusterResult::from)
276                .collect(),
277        }
278    }
279}
280
281impl EtcdLeaveClusterResponse {
282    /// Check if the operation was successful.
283    #[must_use]
284    pub fn is_success(&self) -> bool {
285        !self.results.is_empty()
286    }
287}
288
289// =============================================================================
290// EtcdForfeitLeadership
291// =============================================================================
292
293/// Request to forfeit etcd leadership.
294///
295/// Causes the current leader to step down and trigger a new election.
296#[derive(Debug, Clone, Default)]
297pub struct EtcdForfeitLeadershipRequest;
298
299impl EtcdForfeitLeadershipRequest {
300    /// Create a new request.
301    #[must_use]
302    pub fn new() -> Self {
303        Self
304    }
305}
306
307impl From<EtcdForfeitLeadershipRequest> for ProtoEtcdForfeitLeadershipRequest {
308    fn from(_req: EtcdForfeitLeadershipRequest) -> Self {
309        Self {}
310    }
311}
312
313/// Result from forfeiting leadership.
314#[derive(Debug, Clone)]
315pub struct EtcdForfeitLeadershipResult {
316    /// Node that processed this request.
317    pub node: Option<String>,
318    /// The member that forfeited leadership.
319    pub member: String,
320}
321
322impl From<ProtoEtcdForfeitLeadership> for EtcdForfeitLeadershipResult {
323    fn from(proto: ProtoEtcdForfeitLeadership) -> Self {
324        Self {
325            node: proto.metadata.map(|m| m.hostname),
326            member: proto.member,
327        }
328    }
329}
330
331/// Response from forfeiting leadership.
332#[derive(Debug, Clone)]
333pub struct EtcdForfeitLeadershipResponse {
334    /// Results from each node.
335    pub results: Vec<EtcdForfeitLeadershipResult>,
336}
337
338impl From<ProtoEtcdForfeitLeadershipResponse> for EtcdForfeitLeadershipResponse {
339    fn from(proto: ProtoEtcdForfeitLeadershipResponse) -> Self {
340        Self {
341            results: proto
342                .messages
343                .into_iter()
344                .map(EtcdForfeitLeadershipResult::from)
345                .collect(),
346        }
347    }
348}
349
350// =============================================================================
351// EtcdStatus
352// =============================================================================
353
354/// Status of an etcd member.
355#[derive(Debug, Clone)]
356pub struct EtcdMemberStatus {
357    /// Member ID.
358    pub member_id: u64,
359    /// etcd protocol version.
360    pub protocol_version: String,
361    /// etcd storage version.
362    pub storage_version: String,
363    /// Database size in bytes.
364    pub db_size: i64,
365    /// Database size in use.
366    pub db_size_in_use: i64,
367    /// Current leader ID.
368    pub leader: u64,
369    /// Raft index.
370    pub raft_index: u64,
371    /// Raft term.
372    pub raft_term: u64,
373    /// Raft applied index.
374    pub raft_applied_index: u64,
375    /// Any errors reported.
376    pub errors: Vec<String>,
377    /// Whether this member is a learner.
378    pub is_learner: bool,
379}
380
381impl From<ProtoEtcdMemberStatus> for EtcdMemberStatus {
382    fn from(proto: ProtoEtcdMemberStatus) -> Self {
383        Self {
384            member_id: proto.member_id,
385            protocol_version: proto.protocol_version,
386            storage_version: proto.storage_version,
387            db_size: proto.db_size,
388            db_size_in_use: proto.db_size_in_use,
389            leader: proto.leader,
390            raft_index: proto.raft_index,
391            raft_term: proto.raft_term,
392            raft_applied_index: proto.raft_applied_index,
393            errors: proto.errors,
394            is_learner: proto.is_learner,
395        }
396    }
397}
398
399impl EtcdMemberStatus {
400    /// Check if this member is the leader.
401    #[must_use]
402    pub fn is_leader(&self) -> bool {
403        self.member_id == self.leader
404    }
405
406    /// Check if this member has any errors.
407    #[must_use]
408    pub fn has_errors(&self) -> bool {
409        !self.errors.is_empty()
410    }
411
412    /// Get the database size in human-readable format.
413    #[must_use]
414    pub fn db_size_human(&self) -> String {
415        humanize_bytes(self.db_size as u64)
416    }
417}
418
419/// Result from status request.
420#[derive(Debug, Clone)]
421pub struct EtcdStatusResult {
422    /// Node that returned this status.
423    pub node: Option<String>,
424    /// Member status.
425    pub member_status: Option<EtcdMemberStatus>,
426}
427
428impl From<ProtoEtcdStatus> for EtcdStatusResult {
429    fn from(proto: ProtoEtcdStatus) -> Self {
430        Self {
431            node: proto.metadata.map(|m| m.hostname),
432            member_status: proto.member_status.map(EtcdMemberStatus::from),
433        }
434    }
435}
436
437/// Response from status request.
438#[derive(Debug, Clone)]
439pub struct EtcdStatusResponse {
440    /// Results from each node.
441    pub results: Vec<EtcdStatusResult>,
442}
443
444impl From<ProtoEtcdStatusResponse> for EtcdStatusResponse {
445    fn from(proto: ProtoEtcdStatusResponse) -> Self {
446        Self {
447            results: proto
448                .messages
449                .into_iter()
450                .map(EtcdStatusResult::from)
451                .collect(),
452        }
453    }
454}
455
456impl EtcdStatusResponse {
457    /// Get the first member status.
458    #[must_use]
459    pub fn first(&self) -> Option<&EtcdMemberStatus> {
460        self.results.first().and_then(|r| r.member_status.as_ref())
461    }
462}
463
464// =============================================================================
465// EtcdAlarm
466// =============================================================================
467
468/// Types of etcd alarms.
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
470pub enum EtcdAlarmType {
471    /// No alarm.
472    None,
473    /// No space alarm (database is full).
474    NoSpace,
475    /// Corruption detected.
476    Corrupt,
477}
478
479impl From<i32> for EtcdAlarmType {
480    fn from(value: i32) -> Self {
481        match value {
482            1 => Self::NoSpace,
483            2 => Self::Corrupt,
484            _ => Self::None,
485        }
486    }
487}
488
489impl std::fmt::Display for EtcdAlarmType {
490    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
491        match self {
492            EtcdAlarmType::None => write!(f, "none"),
493            EtcdAlarmType::NoSpace => write!(f, "NOSPACE"),
494            EtcdAlarmType::Corrupt => write!(f, "CORRUPT"),
495        }
496    }
497}
498
499/// Alarm for an etcd member.
500#[derive(Debug, Clone)]
501pub struct EtcdMemberAlarm {
502    /// Member ID with the alarm.
503    pub member_id: u64,
504    /// Type of alarm.
505    pub alarm: EtcdAlarmType,
506}
507
508impl From<ProtoEtcdMemberAlarm> for EtcdMemberAlarm {
509    fn from(proto: ProtoEtcdMemberAlarm) -> Self {
510        Self {
511            member_id: proto.member_id,
512            alarm: EtcdAlarmType::from(proto.alarm),
513        }
514    }
515}
516
517/// Result from alarm list request.
518#[derive(Debug, Clone)]
519pub struct EtcdAlarmResult {
520    /// Node that returned this result.
521    pub node: Option<String>,
522    /// Alarms for each member.
523    pub member_alarms: Vec<EtcdMemberAlarm>,
524}
525
526impl From<ProtoEtcdAlarm> for EtcdAlarmResult {
527    fn from(proto: ProtoEtcdAlarm) -> Self {
528        Self {
529            node: proto.metadata.map(|m| m.hostname),
530            member_alarms: proto
531                .member_alarms
532                .into_iter()
533                .map(EtcdMemberAlarm::from)
534                .collect(),
535        }
536    }
537}
538
539/// Response from alarm list request.
540#[derive(Debug, Clone)]
541pub struct EtcdAlarmListResponse {
542    /// Results from each node.
543    pub results: Vec<EtcdAlarmResult>,
544}
545
546impl From<ProtoEtcdAlarmListResponse> for EtcdAlarmListResponse {
547    fn from(proto: ProtoEtcdAlarmListResponse) -> Self {
548        Self {
549            results: proto
550                .messages
551                .into_iter()
552                .map(EtcdAlarmResult::from)
553                .collect(),
554        }
555    }
556}
557
558impl EtcdAlarmListResponse {
559    /// Check if there are any active alarms.
560    #[must_use]
561    pub fn has_alarms(&self) -> bool {
562        self.results.iter().any(|r| {
563            r.member_alarms
564                .iter()
565                .any(|a| a.alarm != EtcdAlarmType::None)
566        })
567    }
568
569    /// Get all active alarms.
570    #[must_use]
571    pub fn active_alarms(&self) -> Vec<&EtcdMemberAlarm> {
572        self.results
573            .iter()
574            .flat_map(|r| r.member_alarms.iter())
575            .filter(|a| a.alarm != EtcdAlarmType::None)
576            .collect()
577    }
578}
579
580// =============================================================================
581// EtcdAlarmDisarm
582// =============================================================================
583
584/// Result from disarming alarms.
585#[derive(Debug, Clone)]
586pub struct EtcdAlarmDisarmResult {
587    /// Node that processed this request.
588    pub node: Option<String>,
589    /// Alarms that were disarmed.
590    pub member_alarms: Vec<EtcdMemberAlarm>,
591}
592
593impl From<ProtoEtcdAlarmDisarm> for EtcdAlarmDisarmResult {
594    fn from(proto: ProtoEtcdAlarmDisarm) -> Self {
595        Self {
596            node: proto.metadata.map(|m| m.hostname),
597            member_alarms: proto
598                .member_alarms
599                .into_iter()
600                .map(EtcdMemberAlarm::from)
601                .collect(),
602        }
603    }
604}
605
606/// Response from disarming alarms.
607#[derive(Debug, Clone)]
608pub struct EtcdAlarmDisarmResponse {
609    /// Results from each node.
610    pub results: Vec<EtcdAlarmDisarmResult>,
611}
612
613impl From<ProtoEtcdAlarmDisarmResponse> for EtcdAlarmDisarmResponse {
614    fn from(proto: ProtoEtcdAlarmDisarmResponse) -> Self {
615        Self {
616            results: proto
617                .messages
618                .into_iter()
619                .map(EtcdAlarmDisarmResult::from)
620                .collect(),
621        }
622    }
623}
624
625// =============================================================================
626// EtcdDefragment
627// =============================================================================
628
629/// Result from defragmentation.
630#[derive(Debug, Clone)]
631pub struct EtcdDefragmentResult {
632    /// Node that was defragmented.
633    pub node: Option<String>,
634}
635
636impl From<ProtoEtcdDefragment> for EtcdDefragmentResult {
637    fn from(proto: ProtoEtcdDefragment) -> Self {
638        Self {
639            node: proto.metadata.map(|m| m.hostname),
640        }
641    }
642}
643
644/// Response from defragmentation.
645#[derive(Debug, Clone)]
646pub struct EtcdDefragmentResponse {
647    /// Results from each node.
648    pub results: Vec<EtcdDefragmentResult>,
649}
650
651impl From<ProtoEtcdDefragmentResponse> for EtcdDefragmentResponse {
652    fn from(proto: ProtoEtcdDefragmentResponse) -> Self {
653        Self {
654            results: proto
655                .messages
656                .into_iter()
657                .map(EtcdDefragmentResult::from)
658                .collect(),
659        }
660    }
661}
662
663impl EtcdDefragmentResponse {
664    /// Check if defragmentation was successful.
665    #[must_use]
666    pub fn is_success(&self) -> bool {
667        !self.results.is_empty()
668    }
669}
670
671// =============================================================================
672// Helpers
673// =============================================================================
674
675fn humanize_bytes(bytes: u64) -> String {
676    const KB: u64 = 1024;
677    const MB: u64 = KB * 1024;
678    const GB: u64 = MB * 1024;
679
680    if bytes >= GB {
681        format!("{:.2} GB", bytes as f64 / GB as f64)
682    } else if bytes >= MB {
683        format!("{:.2} MB", bytes as f64 / MB as f64)
684    } else if bytes >= KB {
685        format!("{:.2} KB", bytes as f64 / KB as f64)
686    } else {
687        format!("{bytes} B")
688    }
689}
690
691// =============================================================================
692// Tests
693// =============================================================================
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698
699    #[test]
700    fn test_etcd_member_list_request() {
701        let req = EtcdMemberListRequest::new();
702        assert!(!req.query_local);
703
704        let req = EtcdMemberListRequest::local();
705        assert!(req.query_local);
706    }
707
708    #[test]
709    fn test_etcd_remove_member_by_id_request() {
710        let req = EtcdRemoveMemberByIdRequest::new(12345);
711        assert_eq!(req.member_id, 12345);
712
713        let proto: ProtoEtcdRemoveMemberByIdRequest = req.into();
714        assert_eq!(proto.member_id, 12345);
715    }
716
717    #[test]
718    fn test_etcd_alarm_type() {
719        assert_eq!(EtcdAlarmType::from(0), EtcdAlarmType::None);
720        assert_eq!(EtcdAlarmType::from(1), EtcdAlarmType::NoSpace);
721        assert_eq!(EtcdAlarmType::from(2), EtcdAlarmType::Corrupt);
722        assert_eq!(EtcdAlarmType::from(99), EtcdAlarmType::None);
723
724        assert_eq!(EtcdAlarmType::NoSpace.to_string(), "NOSPACE");
725        assert_eq!(EtcdAlarmType::Corrupt.to_string(), "CORRUPT");
726    }
727
728    #[test]
729    fn test_etcd_member_status_is_leader() {
730        let status = EtcdMemberStatus {
731            member_id: 100,
732            protocol_version: "3.5.0".to_string(),
733            storage_version: "3.5".to_string(),
734            db_size: 10 * 1024 * 1024,
735            db_size_in_use: 5 * 1024 * 1024,
736            leader: 100,
737            raft_index: 1000,
738            raft_term: 5,
739            raft_applied_index: 999,
740            errors: vec![],
741            is_learner: false,
742        };
743
744        assert!(status.is_leader());
745        assert!(!status.has_errors());
746        assert_eq!(status.db_size_human(), "10.00 MB");
747    }
748
749    #[test]
750    fn test_humanize_bytes() {
751        assert_eq!(humanize_bytes(500), "500 B");
752        assert_eq!(humanize_bytes(1024), "1.00 KB");
753        assert_eq!(humanize_bytes(1536), "1.50 KB");
754        assert_eq!(humanize_bytes(1024 * 1024), "1.00 MB");
755        assert_eq!(humanize_bytes(1024 * 1024 * 1024), "1.00 GB");
756    }
757
758    #[test]
759    fn test_etcd_leave_cluster_request() {
760        let req = EtcdLeaveClusterRequest::new();
761        let _proto: ProtoEtcdLeaveClusterRequest = req.into();
762    }
763
764    #[test]
765    fn test_etcd_forfeit_leadership_request() {
766        let req = EtcdForfeitLeadershipRequest::new();
767        let _proto: ProtoEtcdForfeitLeadershipRequest = req.into();
768    }
769}