Skip to main content

talos_api_rs/client/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3mod node_target;
4pub mod pool;
5#[cfg(test)]
6mod tests;
7
8pub use node_target::{NodeTarget, NODE_METADATA_KEY};
9
10use crate::api::machine::machine_service_client::MachineServiceClient;
11use crate::api::machine::ApplyConfigurationRequest as ProtoApplyConfigRequest;
12use crate::api::machine::BootstrapRequest as ProtoBootstrapRequest;
13use crate::api::machine::CopyRequest as ProtoCopyRequest;
14use crate::api::machine::DiskUsageRequest as ProtoDiskUsageRequest;
15use crate::api::machine::DmesgRequest as ProtoDmesgRequest;
16use crate::api::machine::EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest;
17use crate::api::machine::EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest;
18use crate::api::machine::EtcdMemberListRequest as ProtoEtcdMemberListRequest;
19use crate::api::machine::EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest;
20use crate::api::machine::GenerateClientConfigurationRequest as ProtoGenerateClientConfigRequest;
21use crate::api::machine::ListRequest as ProtoListRequest;
22use crate::api::machine::LogsRequest as ProtoLogsRequest;
23use crate::api::machine::NetstatRequest as ProtoNetstatRequest;
24use crate::api::machine::PacketCaptureRequest as ProtoPacketCaptureRequest;
25use crate::api::machine::ReadRequest as ProtoReadRequest;
26use crate::api::machine::ResetRequest as ProtoResetRequest;
27use crate::api::machine::RollbackRequest as ProtoRollbackRequest;
28use crate::api::machine::ServiceRestartRequest as ProtoServiceRestartRequest;
29use crate::api::machine::ServiceStartRequest as ProtoServiceStartRequest;
30use crate::api::machine::ServiceStopRequest as ProtoServiceStopRequest;
31use crate::api::machine::UpgradeRequest as ProtoUpgradeRequest;
32use crate::api::version::version_service_client::VersionServiceClient;
33use crate::error::Result;
34use crate::resources::{
35    ApplyConfigurationRequest, ApplyConfigurationResponse, BootstrapRequest, BootstrapResponse,
36    CopyRequest, CopyResponse, CpuInfoResponse, DiskStatsResponse, DiskUsageInfo, DiskUsageRequest,
37    DiskUsageResponse, DmesgRequest, DmesgResponse, EtcdAlarmDisarmResponse, EtcdAlarmListResponse,
38    EtcdDefragmentResponse, EtcdForfeitLeadershipRequest, EtcdForfeitLeadershipResponse,
39    EtcdLeaveClusterRequest, EtcdLeaveClusterResponse, EtcdMemberListRequest,
40    EtcdMemberListResponse, EtcdRemoveMemberByIdRequest, EtcdRemoveMemberByIdResponse,
41    EtcdStatusResponse, FileInfo, GenerateClientConfigurationRequest,
42    GenerateClientConfigurationResponse, KubeconfigResponse, ListRequest, ListResponse,
43    LoadAvgResponse, LogsRequest, LogsResponse, MemoryResponse, MountsResponse, NetstatRequest,
44    NetstatResponse, NetworkDeviceStatsResponse, PacketCaptureRequest, PacketCaptureResponse,
45    ProcessesResponse, ReadRequest, ReadResponse, ResetRequest, ResetResponse, RollbackResponse,
46    ServiceRestartRequest, ServiceRestartResponse, ServiceStartRequest, ServiceStartResponse,
47    ServiceStopRequest, ServiceStopResponse, UpgradeRequest, UpgradeResponse,
48};
49use hyper_util::rt::TokioIo;
50use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
51use std::sync::Arc;
52use std::time::Duration;
53use tonic::transport::{Channel, Endpoint};
54
55/// Configuration for the Talos API client.
56#[derive(Clone, Debug)]
57pub struct TalosClientConfig {
58    /// The gRPC endpoint URL.
59    pub endpoint: String,
60    /// Path to client certificate.
61    pub crt_path: Option<String>,
62    /// Path to client private key.
63    pub key_path: Option<String>,
64    /// Path to CA certificate.
65    pub ca_path: Option<String>,
66    /// If true, skips TLS verification (insecure).
67    pub insecure: bool,
68    /// Connection timeout for establishing the gRPC channel.
69    pub connect_timeout: Option<Duration>,
70    /// Request timeout for individual RPC calls.
71    pub request_timeout: Option<Duration>,
72    /// Keepalive interval for long-running connections.
73    pub keepalive_interval: Option<Duration>,
74    /// Keepalive timeout.
75    pub keepalive_timeout: Option<Duration>,
76}
77
78impl Default for TalosClientConfig {
79    fn default() -> Self {
80        Self {
81            endpoint: "https://127.0.0.1:50000".to_string(),
82            crt_path: None,
83            key_path: None,
84            ca_path: None,
85            insecure: false,
86            connect_timeout: Some(Duration::from_secs(10)),
87            request_timeout: Some(Duration::from_secs(30)),
88            keepalive_interval: Some(Duration::from_secs(30)),
89            keepalive_timeout: Some(Duration::from_secs(10)),
90        }
91    }
92}
93
94impl TalosClientConfig {
95    /// Create a new configuration with an endpoint.
96    #[must_use]
97    pub fn new(endpoint: impl Into<String>) -> Self {
98        Self {
99            endpoint: endpoint.into(),
100            ..Default::default()
101        }
102    }
103
104    /// Create a builder for more complex configuration.
105    #[must_use]
106    pub fn builder(endpoint: impl Into<String>) -> TalosClientConfigBuilder {
107        TalosClientConfigBuilder::new(endpoint)
108    }
109
110    /// Set client certificate path.
111    #[must_use]
112    pub fn with_client_cert(mut self, crt_path: impl Into<String>) -> Self {
113        self.crt_path = Some(crt_path.into());
114        self
115    }
116
117    /// Set client key path.
118    #[must_use]
119    pub fn with_client_key(mut self, key_path: impl Into<String>) -> Self {
120        self.key_path = Some(key_path.into());
121        self
122    }
123
124    /// Set CA certificate path.
125    #[must_use]
126    pub fn with_ca(mut self, ca_path: impl Into<String>) -> Self {
127        self.ca_path = Some(ca_path.into());
128        self
129    }
130
131    /// Enable insecure mode (skip TLS verification).
132    #[must_use]
133    pub fn insecure(mut self) -> Self {
134        self.insecure = true;
135        self
136    }
137
138    /// Set connect timeout.
139    #[must_use]
140    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
141        self.connect_timeout = Some(timeout);
142        self
143    }
144
145    /// Set request timeout.
146    #[must_use]
147    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
148        self.request_timeout = Some(timeout);
149        self
150    }
151
152    /// Disable all timeouts.
153    #[must_use]
154    pub fn no_timeout(mut self) -> Self {
155        self.connect_timeout = None;
156        self.request_timeout = None;
157        self
158    }
159}
160
161/// Builder for `TalosClientConfig`.
162#[derive(Debug, Clone)]
163pub struct TalosClientConfigBuilder {
164    endpoint: String,
165    crt_path: Option<String>,
166    key_path: Option<String>,
167    ca_path: Option<String>,
168    insecure: bool,
169    connect_timeout: Option<Duration>,
170    request_timeout: Option<Duration>,
171    keepalive_interval: Option<Duration>,
172    keepalive_timeout: Option<Duration>,
173}
174
175impl TalosClientConfigBuilder {
176    /// Create a new builder.
177    #[must_use]
178    pub fn new(endpoint: impl Into<String>) -> Self {
179        Self {
180            endpoint: endpoint.into(),
181            crt_path: None,
182            key_path: None,
183            ca_path: None,
184            insecure: false,
185            connect_timeout: Some(Duration::from_secs(10)),
186            request_timeout: Some(Duration::from_secs(30)),
187            keepalive_interval: Some(Duration::from_secs(30)),
188            keepalive_timeout: Some(Duration::from_secs(10)),
189        }
190    }
191
192    /// Set client certificate path.
193    #[must_use]
194    pub fn client_cert(mut self, path: impl Into<String>) -> Self {
195        self.crt_path = Some(path.into());
196        self
197    }
198
199    /// Set client key path.
200    #[must_use]
201    pub fn client_key(mut self, path: impl Into<String>) -> Self {
202        self.key_path = Some(path.into());
203        self
204    }
205
206    /// Set CA certificate path.
207    #[must_use]
208    pub fn ca_cert(mut self, path: impl Into<String>) -> Self {
209        self.ca_path = Some(path.into());
210        self
211    }
212
213    /// Enable insecure mode.
214    #[must_use]
215    pub fn insecure(mut self) -> Self {
216        self.insecure = true;
217        self
218    }
219
220    /// Set connect timeout.
221    #[must_use]
222    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
223        self.connect_timeout = Some(timeout);
224        self
225    }
226
227    /// Set request timeout.
228    #[must_use]
229    pub fn request_timeout(mut self, timeout: Duration) -> Self {
230        self.request_timeout = Some(timeout);
231        self
232    }
233
234    /// Set keepalive settings.
235    #[must_use]
236    pub fn keepalive(mut self, interval: Duration, timeout: Duration) -> Self {
237        self.keepalive_interval = Some(interval);
238        self.keepalive_timeout = Some(timeout);
239        self
240    }
241
242    /// Disable timeouts.
243    #[must_use]
244    pub fn no_timeout(mut self) -> Self {
245        self.connect_timeout = None;
246        self.request_timeout = None;
247        self
248    }
249
250    /// Build the configuration.
251    #[must_use]
252    pub fn build(self) -> TalosClientConfig {
253        TalosClientConfig {
254            endpoint: self.endpoint,
255            crt_path: self.crt_path,
256            key_path: self.key_path,
257            ca_path: self.ca_path,
258            insecure: self.insecure,
259            connect_timeout: self.connect_timeout,
260            request_timeout: self.request_timeout,
261            keepalive_interval: self.keepalive_interval,
262            keepalive_timeout: self.keepalive_timeout,
263        }
264    }
265}
266
267#[derive(Clone)]
268pub struct TalosClient {
269    #[allow(dead_code)] // TODO: Remove when config is used
270    config: TalosClientConfig,
271    channel: Channel,
272    /// Current node target for API calls
273    node_target: NodeTarget,
274}
275
276impl TalosClient {
277    pub async fn new(config: TalosClientConfig) -> Result<Self> {
278        // Install ring as default crypto provider (supports ED25519)
279        let _ = rustls::crypto::ring::default_provider().install_default();
280
281        // Check if using plain HTTP (no TLS)
282        let is_http = config.endpoint.starts_with("http://");
283
284        let channel = if is_http {
285            // Plain HTTP - no TLS at all
286            Self::create_http_channel(&config).await?
287        } else if config.insecure {
288            Self::create_insecure_channel(&config).await?
289        } else {
290            Self::create_mtls_channel(&config).await?
291        };
292
293        Ok(Self {
294            config,
295            channel,
296            node_target: NodeTarget::Default,
297        })
298    }
299
300    /// Create a client from a TalosConfig context
301    ///
302    /// This loads credentials from the talosconfig and connects to the first endpoint.
303    ///
304    /// # Example
305    ///
306    /// ```no_run
307    /// use talos_api_rs::{TalosClient, config::TalosConfig};
308    ///
309    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
310    /// let config = TalosConfig::load_with_env()?;
311    /// let client = TalosClient::from_talosconfig(&config, None).await?;
312    /// # Ok(())
313    /// # }
314    /// ```
315    ///
316    /// # Arguments
317    ///
318    /// * `config` - The loaded TalosConfig
319    /// * `context_name` - Optional context name to use (defaults to active context)
320    pub async fn from_talosconfig(
321        config: &crate::config::TalosConfig,
322        context_name: Option<&str>,
323    ) -> Result<Self> {
324        let context = if let Some(name) = context_name {
325            config.get_context(name).ok_or_else(|| {
326                crate::error::TalosError::Config(format!("Context '{}' not found", name))
327            })?
328        } else {
329            config.active_context().ok_or_else(|| {
330                crate::error::TalosError::Config("No active context in talosconfig".to_string())
331            })?
332        };
333
334        let endpoint = context.first_endpoint().ok_or_else(|| {
335            crate::error::TalosError::Config("No endpoints in context".to_string())
336        })?;
337
338        // Build endpoint URL
339        let endpoint_url = if endpoint.contains("://") {
340            endpoint.clone()
341        } else if endpoint.contains(':') {
342            format!("https://{}", endpoint)
343        } else {
344            format!("https://{}:50000", endpoint)
345        };
346
347        // Build client config
348        let mut client_config = TalosClientConfig::new(&endpoint_url);
349
350        // Write certs to temp files if provided inline
351        if let (Some(ca), Some(crt), Some(key)) = (&context.ca, &context.crt, &context.key) {
352            let temp_dir = std::env::temp_dir().join("talos-api-rs");
353            std::fs::create_dir_all(&temp_dir).map_err(|e| {
354                crate::error::TalosError::Config(format!("Failed to create temp dir: {}", e))
355            })?;
356
357            let ca_path = temp_dir.join("ca.crt");
358            let crt_path = temp_dir.join("client.crt");
359            let key_path = temp_dir.join("client.key");
360
361            std::fs::write(&ca_path, ca).map_err(|e| {
362                crate::error::TalosError::Config(format!("Failed to write CA cert: {}", e))
363            })?;
364            std::fs::write(&crt_path, crt).map_err(|e| {
365                crate::error::TalosError::Config(format!("Failed to write client cert: {}", e))
366            })?;
367            std::fs::write(&key_path, key).map_err(|e| {
368                crate::error::TalosError::Config(format!("Failed to write client key: {}", e))
369            })?;
370
371            client_config = client_config
372                .with_ca(ca_path.to_string_lossy().to_string())
373                .with_client_cert(crt_path.to_string_lossy().to_string())
374                .with_client_key(key_path.to_string_lossy().to_string());
375        }
376
377        let mut client = Self::new(client_config).await?;
378
379        // Set node target from context if available
380        if let Some(nodes) = &context.nodes {
381            if !nodes.is_empty() {
382                client.node_target = NodeTarget::from(nodes.clone());
383            }
384        }
385
386        Ok(client)
387    }
388
389    /// Create a new client targeting a specific node
390    ///
391    /// # Example
392    ///
393    /// ```ignore
394    /// use talos_api_rs::{TalosClient, TalosClientConfig};
395    /// use talos_api_rs::client::NodeTarget;
396    ///
397    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
398    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
399    ///
400    /// // Target a specific node for API calls
401    /// let targeted = client.with_node(NodeTarget::single("192.168.1.10"));
402    /// let hostname = targeted.hostname().await?;
403    /// # Ok(())
404    /// # }
405    /// ```
406    #[must_use]
407    pub fn with_node(&self, target: NodeTarget) -> Self {
408        Self {
409            config: self.config.clone(),
410            channel: self.channel.clone(),
411            node_target: target,
412        }
413    }
414
415    /// Create a new client targeting multiple nodes
416    ///
417    /// Convenience method for cluster-wide operations.
418    #[must_use]
419    pub fn with_nodes(&self, nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
420        self.with_node(NodeTarget::multiple(nodes))
421    }
422
423    /// Get the current node target
424    #[must_use]
425    pub fn node_target(&self) -> &NodeTarget {
426        &self.node_target
427    }
428
429    /// Create a plain HTTP channel (no TLS)
430    async fn create_http_channel(config: &TalosClientConfig) -> Result<Channel> {
431        let mut endpoint = Channel::from_shared(config.endpoint.clone())
432            .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
433
434        // Apply timeout configuration
435        if let Some(timeout) = config.connect_timeout {
436            endpoint = endpoint.connect_timeout(timeout);
437        }
438        if let Some(timeout) = config.request_timeout {
439            endpoint = endpoint.timeout(timeout);
440        }
441        if let Some(interval) = config.keepalive_interval {
442            if let Some(ka_timeout) = config.keepalive_timeout {
443                endpoint = endpoint
444                    .http2_keep_alive_interval(interval)
445                    .keep_alive_timeout(ka_timeout);
446            }
447        }
448
449        let channel = endpoint.connect().await?;
450        Ok(channel)
451    }
452
453    /// Create an insecure channel (TLS without certificate verification)
454    async fn create_insecure_channel(config: &TalosClientConfig) -> Result<Channel> {
455        let tls_config = rustls::ClientConfig::builder()
456            .with_root_certificates(rustls::RootCertStore::empty())
457            .with_no_client_auth();
458
459        Self::connect_with_custom_tls(config, tls_config, true).await
460    }
461
462    /// Create an mTLS channel with full certificate verification
463    async fn create_mtls_channel(config: &TalosClientConfig) -> Result<Channel> {
464        // Load CA certificate
465        let root_store = if let Some(ca_path) = &config.ca_path {
466            let ca_pem = std::fs::read(ca_path).map_err(|e| {
467                crate::error::TalosError::Config(format!("Failed to read CA cert: {e}"))
468            })?;
469            let mut root_store = rustls::RootCertStore::empty();
470            let certs = Self::load_pem_certs(&ca_pem)?;
471            for cert in certs {
472                root_store.add(cert).map_err(|e| {
473                    crate::error::TalosError::Config(format!("Failed to add CA cert: {e}"))
474                })?;
475            }
476            root_store
477        } else {
478            // Use system roots if no CA provided
479            let mut root_store = rustls::RootCertStore::empty();
480            root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
481            root_store
482        };
483
484        // Build TLS config with or without client auth
485        let tls_config =
486            if let (Some(crt_path), Some(key_path)) = (&config.crt_path, &config.key_path) {
487                // mTLS with client certificate
488                let cert_pem = std::fs::read(crt_path).map_err(|e| {
489                    crate::error::TalosError::Config(format!("Failed to read client cert: {e}"))
490                })?;
491                let key_pem = std::fs::read(key_path).map_err(|e| {
492                    crate::error::TalosError::Config(format!("Failed to read client key: {e}"))
493                })?;
494
495                let client_certs = Self::load_pem_certs(&cert_pem)?;
496                let client_key = Self::load_pem_key(&key_pem)?;
497
498                rustls::ClientConfig::builder()
499                    .with_root_certificates(root_store)
500                    .with_client_auth_cert(client_certs, client_key)
501                    .map_err(|e| {
502                        crate::error::TalosError::Config(format!(
503                            "Failed to configure client auth: {e}"
504                        ))
505                    })?
506            } else {
507                // TLS without client auth
508                rustls::ClientConfig::builder()
509                    .with_root_certificates(root_store)
510                    .with_no_client_auth()
511            };
512
513        Self::connect_with_custom_tls(config, tls_config, false).await
514    }
515
516    /// Connect using a custom rustls TLS configuration
517    async fn connect_with_custom_tls(
518        config: &TalosClientConfig,
519        mut tls_config: rustls::ClientConfig,
520        skip_verification: bool,
521    ) -> Result<Channel> {
522        // Override verifier for insecure mode
523        if skip_verification {
524            tls_config
525                .dangerous()
526                .set_certificate_verifier(Arc::new(NoVerifier));
527        }
528
529        // gRPC requires ALPN h2
530        tls_config.alpn_protocols = vec![b"h2".to_vec()];
531        let tls_config = Arc::new(tls_config);
532        let connector = tokio_rustls::TlsConnector::from(tls_config);
533
534        // Extract host for SNI
535        let endpoint_url = if config.endpoint.starts_with("http") {
536            config.endpoint.clone()
537        } else {
538            format!("https://{}", config.endpoint)
539        };
540        let parsed_url = url::Url::parse(&endpoint_url)
541            .map_err(|e| crate::error::TalosError::Config(format!("Invalid endpoint URL: {e}")))?;
542        let host = parsed_url
543            .host_str()
544            .ok_or_else(|| crate::error::TalosError::Config("No host in endpoint".to_string()))?
545            .to_string();
546        let port = parsed_url.port().unwrap_or(50000);
547
548        // For custom connector, use http:// scheme (we handle TLS ourselves)
549        let endpoint_for_connector = format!("http://{}:{}", host, port);
550
551        // Build endpoint with timeout configuration
552        let mut endpoint = Endpoint::from_shared(endpoint_for_connector)
553            .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
554
555        // Apply timeout configuration
556        if let Some(timeout) = config.connect_timeout {
557            endpoint = endpoint.connect_timeout(timeout);
558        }
559        if let Some(timeout) = config.request_timeout {
560            endpoint = endpoint.timeout(timeout);
561        }
562        if let Some(interval) = config.keepalive_interval {
563            if let Some(ka_timeout) = config.keepalive_timeout {
564                endpoint = endpoint
565                    .http2_keep_alive_interval(interval)
566                    .keep_alive_timeout(ka_timeout);
567            }
568        }
569
570        let channel = endpoint
571            .connect_with_connector(tower::service_fn(move |uri: tonic::transport::Uri| {
572                let connector = connector.clone();
573                let host = host.clone();
574                async move {
575                    let uri_host = uri.host().unwrap_or("127.0.0.1");
576                    let uri_port = uri.port_u16().unwrap_or(50000);
577                    let addr = format!("{}:{}", uri_host, uri_port);
578
579                    let tcp = tokio::net::TcpStream::connect(addr).await?;
580
581                    // Use actual hostname for SNI (important for cert verification)
582                    let server_name = ServerName::try_from(host.clone())
583                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
584
585                    let tls_stream = connector.connect(server_name, tcp).await?;
586                    Ok::<_, std::io::Error>(TokioIo::new(tls_stream))
587                }
588            }))
589            .await?;
590
591        Ok(channel)
592    }
593
594    /// Load PEM-encoded certificates
595    #[allow(clippy::result_large_err)]
596    fn load_pem_certs(pem_data: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
597        let mut reader = std::io::BufReader::new(pem_data);
598        let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
599            .collect::<std::result::Result<Vec<_>, _>>()
600            .map_err(|e| {
601                crate::error::TalosError::Config(format!("Failed to parse PEM certificates: {e}"))
602            })?;
603        if certs.is_empty() {
604            return Err(crate::error::TalosError::Config(
605                "No certificates found in PEM data".to_string(),
606            ));
607        }
608        Ok(certs)
609    }
610
611    /// Load PEM-encoded private key (supports RSA, EC, PKCS8, and ED25519)
612    #[allow(clippy::result_large_err)]
613    fn load_pem_key(pem_data: &[u8]) -> Result<PrivateKeyDer<'static>> {
614        // First, try standard PEM formats via rustls_pemfile
615        let mut reader = std::io::BufReader::new(pem_data);
616
617        loop {
618            match rustls_pemfile::read_one(&mut reader) {
619                Ok(Some(rustls_pemfile::Item::Pkcs1Key(key))) => {
620                    return Ok(PrivateKeyDer::Pkcs1(key));
621                }
622                Ok(Some(rustls_pemfile::Item::Pkcs8Key(key))) => {
623                    return Ok(PrivateKeyDer::Pkcs8(key));
624                }
625                Ok(Some(rustls_pemfile::Item::Sec1Key(key))) => {
626                    return Ok(PrivateKeyDer::Sec1(key));
627                }
628                Ok(Some(_)) => {
629                    // Skip other PEM items (certificates, etc.)
630                    continue;
631                }
632                Ok(None) => {
633                    break;
634                }
635                Err(e) => {
636                    return Err(crate::error::TalosError::Config(format!(
637                        "Failed to parse PEM key: {e}"
638                    )));
639                }
640            }
641        }
642
643        // Fallback: Handle non-standard "ED25519 PRIVATE KEY" PEM label
644        // Talos uses this format, which is PKCS#8-encoded but with a custom label
645        let pem_str = std::str::from_utf8(pem_data)
646            .map_err(|e| crate::error::TalosError::Config(format!("Invalid UTF-8 in key: {e}")))?;
647
648        if pem_str.contains("-----BEGIN ED25519 PRIVATE KEY-----") {
649            // Extract the base64 content between the headers
650            let start_marker = "-----BEGIN ED25519 PRIVATE KEY-----";
651            let end_marker = "-----END ED25519 PRIVATE KEY-----";
652
653            if let Some(start) = pem_str.find(start_marker) {
654                if let Some(end) = pem_str.find(end_marker) {
655                    let base64_content = &pem_str[start + start_marker.len()..end];
656                    let base64_clean: String = base64_content
657                        .chars()
658                        .filter(|c| !c.is_whitespace())
659                        .collect();
660
661                    let der_bytes = base64::Engine::decode(
662                        &base64::engine::general_purpose::STANDARD,
663                        &base64_clean,
664                    )
665                    .map_err(|e| {
666                        crate::error::TalosError::Config(format!(
667                            "Failed to decode ED25519 key: {e}"
668                        ))
669                    })?;
670
671                    // ED25519 PRIVATE KEY is actually PKCS#8 encoded
672                    return Ok(PrivateKeyDer::Pkcs8(
673                        rustls::pki_types::PrivatePkcs8KeyDer::from(der_bytes),
674                    ));
675                }
676            }
677        }
678
679        Err(crate::error::TalosError::Config(
680            "No private key found in PEM data".to_string(),
681        ))
682    }
683
684    /// Access the Version API group
685    pub fn version(&self) -> VersionServiceClient<Channel> {
686        VersionServiceClient::new(self.channel.clone())
687    }
688
689    /// Access the Machine API group
690    pub fn machine(&self) -> MachineServiceClient<Channel> {
691        MachineServiceClient::new(self.channel.clone())
692    }
693
694    /// Create a gRPC request with node targeting applied
695    fn make_request<T>(&self, inner: T) -> tonic::Request<T> {
696        self.node_target
697            .apply_to_request(tonic::Request::new(inner))
698    }
699
700    // ========================================================================
701    // High-level convenience methods
702    // ========================================================================
703
704    /// Apply a configuration to the node.
705    ///
706    /// This is a high-level wrapper around the `MachineService::ApplyConfiguration` RPC.
707    ///
708    /// # Example
709    ///
710    /// ```ignore
711    /// use talos_api_rs::{TalosClient, TalosClientConfig, ApplyConfigurationRequest, ApplyMode};
712    ///
713    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
714    /// let client = TalosClient::new(TalosClientConfig {
715    ///     endpoint: "https://192.168.1.100:50000".to_string(),
716    ///     insecure: true,
717    ///     ..Default::default()
718    /// }).await?;
719    ///
720    /// // Apply configuration in dry-run mode
721    /// let request = ApplyConfigurationRequest::builder()
722    ///     .config_yaml("machine:\n  type: worker")
723    ///     .mode(ApplyMode::NoReboot)
724    ///     .dry_run(true)
725    ///     .build();
726    ///
727    /// let response = client.apply_configuration(request).await?;
728    /// println!("Warnings: {:?}", response.all_warnings());
729    /// # Ok(())
730    /// # }
731    /// ```
732    ///
733    /// # Errors
734    ///
735    /// Returns an error if the RPC call fails or the configuration is invalid.
736    pub async fn apply_configuration(
737        &self,
738        request: ApplyConfigurationRequest,
739    ) -> Result<ApplyConfigurationResponse> {
740        let proto_request: ProtoApplyConfigRequest = request.into();
741        let grpc_request = self.make_request(proto_request);
742        let response = self
743            .machine()
744            .apply_configuration(grpc_request)
745            .await?
746            .into_inner();
747        Ok(response.into())
748    }
749
750    /// Apply a YAML configuration string to the node.
751    ///
752    /// Convenience method for simple configuration application.
753    ///
754    /// # Example
755    ///
756    /// ```ignore
757    /// use talos_api_rs::{TalosClient, TalosClientConfig, ApplyMode};
758    ///
759    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
760    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
761    /// let config_yaml = std::fs::read_to_string("machine.yaml")?;
762    /// let response = client.apply_configuration_yaml(&config_yaml, ApplyMode::Auto, false).await?;
763    /// # Ok(())
764    /// # }
765    /// ```
766    pub async fn apply_configuration_yaml(
767        &self,
768        yaml: &str,
769        mode: crate::ApplyMode,
770        dry_run: bool,
771    ) -> Result<ApplyConfigurationResponse> {
772        let request = ApplyConfigurationRequest::builder()
773            .config_yaml(yaml)
774            .mode(mode)
775            .dry_run(dry_run)
776            .build();
777        self.apply_configuration(request).await
778    }
779
780    /// Bootstrap the etcd cluster on this node.
781    ///
782    /// This initializes a new etcd cluster. **This should only be called ONCE**
783    /// on the first control-plane node when creating a new Talos cluster.
784    ///
785    /// # Example
786    ///
787    /// ```ignore
788    /// use talos_api_rs::{TalosClient, TalosClientConfig, BootstrapRequest};
789    ///
790    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
791    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
792    ///
793    /// // Bootstrap a new cluster
794    /// let response = client.bootstrap(BootstrapRequest::new()).await?;
795    /// println!("Bootstrap complete: {:?}", response.first());
796    /// # Ok(())
797    /// # }
798    /// ```
799    ///
800    /// # Recovery
801    ///
802    /// To recover from an etcd snapshot (uploaded via `EtcdRecover` RPC):
803    ///
804    /// ```ignore
805    /// use talos_api_rs::{TalosClient, TalosClientConfig, BootstrapRequest};
806    ///
807    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
808    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
809    /// let response = client.bootstrap(BootstrapRequest::recovery()).await?;
810    /// # Ok(())
811    /// # }
812    /// ```
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if:
817    /// - The node is not a control-plane node
818    /// - etcd is already bootstrapped
819    /// - Network/connection issues
820    pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<BootstrapResponse> {
821        let proto_request: ProtoBootstrapRequest = request.into();
822        let grpc_request = self.make_request(proto_request);
823        let response = self.machine().bootstrap(grpc_request).await?.into_inner();
824        Ok(response.into())
825    }
826
827    /// Bootstrap a new etcd cluster (convenience method).
828    ///
829    /// Equivalent to `bootstrap(BootstrapRequest::new())`.
830    pub async fn bootstrap_cluster(&self) -> Result<BootstrapResponse> {
831        self.bootstrap(BootstrapRequest::new()).await
832    }
833
834    /// Retrieve the kubeconfig from the cluster.
835    ///
836    /// This is a server-streaming RPC that retrieves the kubeconfig file
837    /// from a control-plane node. The kubeconfig can be used to access
838    /// the Kubernetes API of the cluster.
839    ///
840    /// # Example
841    ///
842    /// ```ignore
843    /// use talos_api_rs::{TalosClient, TalosClientConfig};
844    ///
845    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
846    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
847    ///
848    /// // Get kubeconfig
849    /// let kubeconfig = client.kubeconfig().await?;
850    /// println!("Kubeconfig from node: {:?}", kubeconfig.node);
851    ///
852    /// // Write to file
853    /// kubeconfig.write_to_file("kubeconfig.yaml")?;
854    ///
855    /// // Or get as string
856    /// let yaml = kubeconfig.as_str()?;
857    /// println!("{}", yaml);
858    /// # Ok(())
859    /// # }
860    /// ```
861    ///
862    /// # Errors
863    ///
864    /// Returns an error if:
865    /// - The node is not a control-plane node
866    /// - The cluster is not yet bootstrapped
867    /// - Network/connection issues
868    pub async fn kubeconfig(&self) -> Result<KubeconfigResponse> {
869        use tonic::codegen::tokio_stream::StreamExt;
870
871        let mut stream = self.machine().kubeconfig(()).await?.into_inner();
872
873        let mut data = Vec::new();
874        let mut node = None;
875
876        while let Some(chunk) = stream.next().await {
877            let chunk = chunk?;
878            // Capture node from first chunk with metadata
879            if node.is_none() {
880                if let Some(metadata) = &chunk.metadata {
881                    node = Some(metadata.hostname.clone());
882                }
883            }
884            data.extend(chunk.bytes);
885        }
886
887        Ok(KubeconfigResponse::new(data, node))
888    }
889
890    /// Reset a Talos node, optionally wiping disks.
891    ///
892    /// # Warning
893    ///
894    /// This is a **destructive** operation. The node will be reset and may
895    /// lose all data depending on the wipe mode configured.
896    ///
897    /// # Arguments
898    ///
899    /// * `request` - The reset request configuration
900    ///
901    /// # Example
902    ///
903    /// ```ignore
904    /// use talos_api_rs::{TalosClient, TalosClientConfig, ResetRequest};
905    ///
906    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
907    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
908    /// let client = TalosClient::new(config).await?;
909    ///
910    /// // Graceful reset (leaves etcd cluster first)
911    /// let response = client.reset(ResetRequest::graceful()).await?;
912    ///
913    /// // Force reset with full disk wipe
914    /// let response = client.reset(ResetRequest::force()).await?;
915    ///
916    /// // Custom reset
917    /// let response = client.reset(
918    ///     ResetRequest::builder()
919    ///         .graceful(true)
920    ///         .reboot(true)
921    ///         .build()
922    /// ).await?;
923    /// # Ok(())
924    /// # }
925    /// ```
926    pub async fn reset(&self, request: ResetRequest) -> Result<ResetResponse> {
927        let mut client = MachineServiceClient::new(self.channel.clone());
928
929        let proto_request: ProtoResetRequest = request.into();
930        let response = client.reset(proto_request).await?;
931        let inner = response.into_inner();
932
933        Ok(ResetResponse::from(inner))
934    }
935
936    /// Gracefully reset a Talos node.
937    ///
938    /// This is a convenience method that performs a graceful reset, which:
939    /// - Leaves the etcd cluster gracefully (for control plane nodes)
940    /// - Reboots after reset
941    /// - Does not wipe disks
942    ///
943    /// For more control, use [`reset`](Self::reset) with a custom [`ResetRequest`].
944    pub async fn reset_graceful(&self) -> Result<ResetResponse> {
945        self.reset(ResetRequest::graceful()).await
946    }
947
948    // =========================================================================
949    // etcd Operations
950    // =========================================================================
951
952    /// List etcd cluster members.
953    ///
954    /// # Example
955    ///
956    /// ```ignore
957    /// use talos_api_rs::{TalosClient, TalosClientConfig, EtcdMemberListRequest};
958    ///
959    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
960    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
961    /// let client = TalosClient::new(config).await?;
962    ///
963    /// let response = client.etcd_member_list(EtcdMemberListRequest::new()).await?;
964    /// for member in response.all_members() {
965    ///     println!("{}: {}", member.id, member.hostname);
966    /// }
967    /// # Ok(())
968    /// # }
969    /// ```
970    pub async fn etcd_member_list(
971        &self,
972        request: EtcdMemberListRequest,
973    ) -> Result<EtcdMemberListResponse> {
974        let mut client = MachineServiceClient::new(self.channel.clone());
975
976        let proto_request: ProtoEtcdMemberListRequest = request.into();
977        let response = client.etcd_member_list(proto_request).await?;
978        let inner = response.into_inner();
979
980        Ok(EtcdMemberListResponse::from(inner))
981    }
982
983    /// Remove an etcd member by ID.
984    ///
985    /// Use this to remove members that no longer have an associated Talos node.
986    /// For nodes that are still running, use [`etcd_leave_cluster`](Self::etcd_leave_cluster).
987    ///
988    /// # Example
989    ///
990    /// ```ignore
991    /// use talos_api_rs::{TalosClient, TalosClientConfig, EtcdRemoveMemberByIdRequest};
992    ///
993    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
994    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
995    /// let client = TalosClient::new(config).await?;
996    ///
997    /// // First, find the member ID
998    /// let members = client.etcd_member_list(Default::default()).await?;
999    /// if let Some(member) = members.find_by_hostname("old-node") {
1000    ///     client.etcd_remove_member_by_id(
1001    ///         EtcdRemoveMemberByIdRequest::new(member.id)
1002    ///     ).await?;
1003    /// }
1004    /// # Ok(())
1005    /// # }
1006    /// ```
1007    pub async fn etcd_remove_member_by_id(
1008        &self,
1009        request: EtcdRemoveMemberByIdRequest,
1010    ) -> Result<EtcdRemoveMemberByIdResponse> {
1011        let mut client = MachineServiceClient::new(self.channel.clone());
1012
1013        let proto_request: ProtoEtcdRemoveMemberByIdRequest = request.into();
1014        let response = client.etcd_remove_member_by_id(proto_request).await?;
1015        let inner = response.into_inner();
1016
1017        Ok(EtcdRemoveMemberByIdResponse::from(inner))
1018    }
1019
1020    /// Make a node leave the etcd cluster gracefully.
1021    ///
1022    /// This should be called on the node that is being removed.
1023    pub async fn etcd_leave_cluster(
1024        &self,
1025        request: EtcdLeaveClusterRequest,
1026    ) -> Result<EtcdLeaveClusterResponse> {
1027        let mut client = MachineServiceClient::new(self.channel.clone());
1028
1029        let proto_request: ProtoEtcdLeaveClusterRequest = request.into();
1030        let response = client.etcd_leave_cluster(proto_request).await?;
1031        let inner = response.into_inner();
1032
1033        Ok(EtcdLeaveClusterResponse::from(inner))
1034    }
1035
1036    /// Forfeit etcd leadership.
1037    ///
1038    /// Causes the current leader to step down and trigger a new election.
1039    pub async fn etcd_forfeit_leadership(
1040        &self,
1041        request: EtcdForfeitLeadershipRequest,
1042    ) -> Result<EtcdForfeitLeadershipResponse> {
1043        let mut client = MachineServiceClient::new(self.channel.clone());
1044
1045        let proto_request: ProtoEtcdForfeitLeadershipRequest = request.into();
1046        let response = client.etcd_forfeit_leadership(proto_request).await?;
1047        let inner = response.into_inner();
1048
1049        Ok(EtcdForfeitLeadershipResponse::from(inner))
1050    }
1051
1052    /// Get etcd status for the current member.
1053    pub async fn etcd_status(&self) -> Result<EtcdStatusResponse> {
1054        let mut client = MachineServiceClient::new(self.channel.clone());
1055
1056        let response = client.etcd_status(()).await?;
1057        let inner = response.into_inner();
1058
1059        Ok(EtcdStatusResponse::from(inner))
1060    }
1061
1062    /// List etcd alarms.
1063    pub async fn etcd_alarm_list(&self) -> Result<EtcdAlarmListResponse> {
1064        let mut client = MachineServiceClient::new(self.channel.clone());
1065
1066        let response = client.etcd_alarm_list(()).await?;
1067        let inner = response.into_inner();
1068
1069        Ok(EtcdAlarmListResponse::from(inner))
1070    }
1071
1072    /// Disarm etcd alarms.
1073    pub async fn etcd_alarm_disarm(&self) -> Result<EtcdAlarmDisarmResponse> {
1074        let mut client = MachineServiceClient::new(self.channel.clone());
1075
1076        let response = client.etcd_alarm_disarm(()).await?;
1077        let inner = response.into_inner();
1078
1079        Ok(EtcdAlarmDisarmResponse::from(inner))
1080    }
1081
1082    /// Defragment etcd storage.
1083    ///
1084    /// **Warning**: This is a resource-heavy operation.
1085    pub async fn etcd_defragment(&self) -> Result<EtcdDefragmentResponse> {
1086        let mut client = MachineServiceClient::new(self.channel.clone());
1087
1088        let response = client.etcd_defragment(()).await?;
1089        let inner = response.into_inner();
1090
1091        Ok(EtcdDefragmentResponse::from(inner))
1092    }
1093
1094    // =========================================================================
1095    // Diagnostics
1096    // =========================================================================
1097
1098    /// Get kernel message buffer (dmesg).
1099    ///
1100    /// This is a server-streaming RPC that returns kernel messages.
1101    ///
1102    /// # Example
1103    ///
1104    /// ```ignore
1105    /// use talos_api_rs::{TalosClient, TalosClientConfig, DmesgRequest};
1106    ///
1107    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1108    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
1109    /// let client = TalosClient::new(config).await?;
1110    ///
1111    /// let dmesg = client.dmesg(DmesgRequest::new()).await?;
1112    /// println!("{}", dmesg.as_string_lossy());
1113    /// # Ok(())
1114    /// # }
1115    /// ```
1116    pub async fn dmesg(&self, request: DmesgRequest) -> Result<DmesgResponse> {
1117        use tonic::codegen::tokio_stream::StreamExt;
1118
1119        let mut client = MachineServiceClient::new(self.channel.clone());
1120
1121        let proto_request: ProtoDmesgRequest = request.into();
1122        let response = client.dmesg(proto_request).await?;
1123        let mut stream = response.into_inner();
1124
1125        let mut data = Vec::new();
1126        let mut node = None;
1127
1128        while let Some(chunk) = stream.next().await {
1129            let chunk = chunk?;
1130            if node.is_none() {
1131                if let Some(metadata) = &chunk.metadata {
1132                    node = Some(metadata.hostname.clone());
1133                }
1134            }
1135            data.extend(chunk.bytes);
1136        }
1137
1138        Ok(DmesgResponse::new(data, node))
1139    }
1140
1141    // =========================================================================
1142    // Upgrade
1143    // =========================================================================
1144
1145    /// Upgrade a Talos node to a new version.
1146    ///
1147    /// # Example
1148    ///
1149    /// ```ignore
1150    /// use talos_api_rs::{TalosClient, TalosClientConfig, UpgradeRequest};
1151    ///
1152    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1153    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
1154    /// let client = TalosClient::new(config).await?;
1155    ///
1156    /// // Upgrade to a specific version
1157    /// let response = client.upgrade(
1158    ///     UpgradeRequest::new("ghcr.io/siderolabs/installer:v1.6.0")
1159    /// ).await?;
1160    ///
1161    /// // Staged upgrade (downloads but doesn't apply until reboot)
1162    /// let response = client.upgrade(
1163    ///     UpgradeRequest::builder("ghcr.io/siderolabs/installer:v1.6.0")
1164    ///         .stage(true)
1165    ///         .preserve(true)
1166    ///         .build()
1167    /// ).await?;
1168    /// # Ok(())
1169    /// # }
1170    /// ```
1171    pub async fn upgrade(&self, request: UpgradeRequest) -> Result<UpgradeResponse> {
1172        let mut client = MachineServiceClient::new(self.channel.clone());
1173
1174        let proto_request: ProtoUpgradeRequest = request.into();
1175        let response = client.upgrade(proto_request).await?;
1176        let inner = response.into_inner();
1177
1178        Ok(UpgradeResponse::from(inner))
1179    }
1180
1181    // =========================================================================
1182    // Service Management
1183    // =========================================================================
1184
1185    /// Start a service.
1186    pub async fn service_start(
1187        &self,
1188        request: ServiceStartRequest,
1189    ) -> Result<ServiceStartResponse> {
1190        let mut client = MachineServiceClient::new(self.channel.clone());
1191
1192        let proto_request: ProtoServiceStartRequest = request.into();
1193        let response = client.service_start(proto_request).await?;
1194        let inner = response.into_inner();
1195
1196        Ok(ServiceStartResponse::from(inner))
1197    }
1198
1199    /// Stop a service.
1200    pub async fn service_stop(&self, request: ServiceStopRequest) -> Result<ServiceStopResponse> {
1201        let mut client = MachineServiceClient::new(self.channel.clone());
1202
1203        let proto_request: ProtoServiceStopRequest = request.into();
1204        let response = client.service_stop(proto_request).await?;
1205        let inner = response.into_inner();
1206
1207        Ok(ServiceStopResponse::from(inner))
1208    }
1209
1210    /// Restart a service.
1211    pub async fn service_restart(
1212        &self,
1213        request: ServiceRestartRequest,
1214    ) -> Result<ServiceRestartResponse> {
1215        let mut client = MachineServiceClient::new(self.channel.clone());
1216
1217        let proto_request: ProtoServiceRestartRequest = request.into();
1218        let response = client.service_restart(proto_request).await?;
1219        let inner = response.into_inner();
1220
1221        Ok(ServiceRestartResponse::from(inner))
1222    }
1223
1224    /// Get service/container logs (server-streaming).
1225    pub async fn logs(&self, request: LogsRequest) -> Result<LogsResponse> {
1226        use tonic::codegen::tokio_stream::StreamExt;
1227
1228        let mut client = MachineServiceClient::new(self.channel.clone());
1229
1230        let proto_request: ProtoLogsRequest = request.into();
1231        let response = client.logs(proto_request).await?;
1232        let mut stream = response.into_inner();
1233
1234        let mut data = Vec::new();
1235        let mut node = None;
1236
1237        while let Some(chunk) = stream.next().await {
1238            let chunk = chunk?;
1239            if node.is_none() {
1240                if let Some(metadata) = &chunk.metadata {
1241                    node = Some(metadata.hostname.clone());
1242                }
1243            }
1244            data.extend(chunk.bytes);
1245        }
1246
1247        Ok(LogsResponse::new(data, node))
1248    }
1249
1250    // =========================================================================
1251    // System Information
1252    // =========================================================================
1253
1254    /// Get system load averages.
1255    pub async fn load_avg(&self) -> Result<LoadAvgResponse> {
1256        let mut client = MachineServiceClient::new(self.channel.clone());
1257
1258        let response = client.load_avg(()).await?;
1259        let inner = response.into_inner();
1260
1261        Ok(LoadAvgResponse::from(inner))
1262    }
1263
1264    /// Get memory information.
1265    pub async fn memory(&self) -> Result<MemoryResponse> {
1266        let mut client = MachineServiceClient::new(self.channel.clone());
1267
1268        let response = client.memory(()).await?;
1269        let inner = response.into_inner();
1270
1271        Ok(MemoryResponse::from(inner))
1272    }
1273
1274    /// Get CPU information.
1275    pub async fn cpu_info(&self) -> Result<CpuInfoResponse> {
1276        let mut client = MachineServiceClient::new(self.channel.clone());
1277
1278        let response = client.cpu_info(()).await?;
1279        let inner = response.into_inner();
1280
1281        Ok(CpuInfoResponse::from(inner))
1282    }
1283
1284    /// Get disk statistics.
1285    pub async fn disk_stats(&self) -> Result<DiskStatsResponse> {
1286        let mut client = MachineServiceClient::new(self.channel.clone());
1287
1288        let response = client.disk_stats(()).await?;
1289        let inner = response.into_inner();
1290
1291        Ok(DiskStatsResponse::from(inner))
1292    }
1293
1294    /// Get network device statistics.
1295    pub async fn network_device_stats(&self) -> Result<NetworkDeviceStatsResponse> {
1296        let mut client = MachineServiceClient::new(self.channel.clone());
1297
1298        let response = client.network_device_stats(()).await?;
1299        let inner = response.into_inner();
1300
1301        Ok(NetworkDeviceStatsResponse::from(inner))
1302    }
1303
1304    /// Get mount points.
1305    pub async fn mounts(&self) -> Result<MountsResponse> {
1306        let mut client = MachineServiceClient::new(self.channel.clone());
1307
1308        let response = client.mounts(()).await?;
1309        let inner = response.into_inner();
1310
1311        Ok(MountsResponse::from(inner))
1312    }
1313
1314    /// Get process list.
1315    pub async fn processes(&self) -> Result<ProcessesResponse> {
1316        let mut client = MachineServiceClient::new(self.channel.clone());
1317
1318        let response = client.processes(()).await?;
1319        let inner = response.into_inner();
1320
1321        Ok(ProcessesResponse::from(inner))
1322    }
1323
1324    // =========================================================================
1325    // File Operations
1326    // =========================================================================
1327
1328    /// List directory contents (server-streaming).
1329    pub async fn list(&self, request: ListRequest) -> Result<ListResponse> {
1330        use tonic::codegen::tokio_stream::StreamExt;
1331
1332        let mut client = MachineServiceClient::new(self.channel.clone());
1333
1334        let proto_request: ProtoListRequest = request.into();
1335        let response = client.list(proto_request).await?;
1336        let mut stream = response.into_inner();
1337
1338        let mut entries = Vec::new();
1339        while let Some(info) = stream.next().await {
1340            let info = info?;
1341            entries.push(FileInfo::from(info));
1342        }
1343
1344        Ok(ListResponse::new(entries))
1345    }
1346
1347    /// Read a file (server-streaming).
1348    pub async fn read(&self, request: ReadRequest) -> Result<ReadResponse> {
1349        use tonic::codegen::tokio_stream::StreamExt;
1350
1351        let mut client = MachineServiceClient::new(self.channel.clone());
1352
1353        let proto_request: ProtoReadRequest = request.into();
1354        let response = client.read(proto_request).await?;
1355        let mut stream = response.into_inner();
1356
1357        let mut data = Vec::new();
1358        let mut node = None;
1359
1360        while let Some(chunk) = stream.next().await {
1361            let chunk = chunk?;
1362            if node.is_none() {
1363                if let Some(metadata) = &chunk.metadata {
1364                    node = Some(metadata.hostname.clone());
1365                }
1366            }
1367            data.extend(chunk.bytes);
1368        }
1369
1370        Ok(ReadResponse::new(data, node))
1371    }
1372
1373    /// Copy a file or directory as tar archive (server-streaming).
1374    pub async fn copy(&self, request: CopyRequest) -> Result<CopyResponse> {
1375        use tonic::codegen::tokio_stream::StreamExt;
1376
1377        let mut client = MachineServiceClient::new(self.channel.clone());
1378
1379        let proto_request: ProtoCopyRequest = request.into();
1380        let response = client.copy(proto_request).await?;
1381        let mut stream = response.into_inner();
1382
1383        let mut data = Vec::new();
1384        let mut node = None;
1385
1386        while let Some(chunk) = stream.next().await {
1387            let chunk = chunk?;
1388            if node.is_none() {
1389                if let Some(metadata) = &chunk.metadata {
1390                    node = Some(metadata.hostname.clone());
1391                }
1392            }
1393            data.extend(chunk.bytes);
1394        }
1395
1396        Ok(CopyResponse::new(data, node))
1397    }
1398
1399    /// Get disk usage (server-streaming).
1400    pub async fn disk_usage(&self, request: DiskUsageRequest) -> Result<DiskUsageResponse> {
1401        use tonic::codegen::tokio_stream::StreamExt;
1402
1403        let mut client = MachineServiceClient::new(self.channel.clone());
1404
1405        let proto_request: ProtoDiskUsageRequest = request.into();
1406        let response = client.disk_usage(proto_request).await?;
1407        let mut stream = response.into_inner();
1408
1409        let mut entries = Vec::new();
1410        while let Some(info) = stream.next().await {
1411            let info = info?;
1412            entries.push(DiskUsageInfo::from(info));
1413        }
1414
1415        Ok(DiskUsageResponse::new(entries))
1416    }
1417
1418    // =========================================================================
1419    // Advanced APIs
1420    // =========================================================================
1421
1422    /// Rollback a Talos node to the previous installed version.
1423    pub async fn rollback(&self) -> Result<RollbackResponse> {
1424        let mut client = MachineServiceClient::new(self.channel.clone());
1425
1426        let response = client.rollback(ProtoRollbackRequest {}).await?;
1427        let inner = response.into_inner();
1428
1429        Ok(RollbackResponse::from(inner))
1430    }
1431
1432    /// Generate client configuration (talosconfig).
1433    pub async fn generate_client_configuration(
1434        &self,
1435        request: GenerateClientConfigurationRequest,
1436    ) -> Result<GenerateClientConfigurationResponse> {
1437        let mut client = MachineServiceClient::new(self.channel.clone());
1438
1439        let proto_request: ProtoGenerateClientConfigRequest = request.into();
1440        let response = client.generate_client_configuration(proto_request).await?;
1441        let inner = response.into_inner();
1442
1443        Ok(GenerateClientConfigurationResponse::from(inner))
1444    }
1445
1446    /// Capture packets on a network interface (server-streaming).
1447    pub async fn packet_capture(
1448        &self,
1449        request: PacketCaptureRequest,
1450    ) -> Result<PacketCaptureResponse> {
1451        use tonic::codegen::tokio_stream::StreamExt;
1452
1453        let mut client = MachineServiceClient::new(self.channel.clone());
1454
1455        let proto_request: ProtoPacketCaptureRequest = request.into();
1456        let response = client.packet_capture(proto_request).await?;
1457        let mut stream = response.into_inner();
1458
1459        let mut data = Vec::new();
1460        let mut node = None;
1461
1462        while let Some(chunk) = stream.next().await {
1463            let chunk = chunk?;
1464            if node.is_none() {
1465                if let Some(metadata) = &chunk.metadata {
1466                    node = Some(metadata.hostname.clone());
1467                }
1468            }
1469            data.extend(chunk.bytes);
1470        }
1471
1472        Ok(PacketCaptureResponse::new(data, node))
1473    }
1474
1475    /// Get network connection information (netstat).
1476    pub async fn netstat(&self, request: NetstatRequest) -> Result<NetstatResponse> {
1477        let mut client = MachineServiceClient::new(self.channel.clone());
1478
1479        let proto_request: ProtoNetstatRequest = request.into();
1480        let response = client.netstat(proto_request).await?;
1481        let inner = response.into_inner();
1482
1483        Ok(NetstatResponse::from(inner))
1484    }
1485}
1486
1487// Helper for insecure mode
1488#[derive(Debug)]
1489struct NoVerifier;
1490
1491impl rustls::client::danger::ServerCertVerifier for NoVerifier {
1492    fn verify_server_cert(
1493        &self,
1494        _end_entity: &rustls::pki_types::CertificateDer<'_>,
1495        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
1496        _server_name: &rustls::pki_types::ServerName<'_>,
1497        _ocsp_response: &[u8],
1498        _now: rustls::pki_types::UnixTime,
1499    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1500        Ok(rustls::client::danger::ServerCertVerified::assertion())
1501    }
1502
1503    fn verify_tls12_signature(
1504        &self,
1505        _message: &[u8],
1506        _cert: &rustls::pki_types::CertificateDer<'_>,
1507        _dss: &rustls::DigitallySignedStruct,
1508    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1509        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1510    }
1511
1512    fn verify_tls13_signature(
1513        &self,
1514        _message: &[u8],
1515        _cert: &rustls::pki_types::CertificateDer<'_>,
1516        _dss: &rustls::DigitallySignedStruct,
1517    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1518        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1519    }
1520
1521    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1522        vec![
1523            rustls::SignatureScheme::RSA_PKCS1_SHA1,
1524            rustls::SignatureScheme::ECDSA_SHA1_Legacy,
1525            rustls::SignatureScheme::RSA_PKCS1_SHA256,
1526            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1527            rustls::SignatureScheme::RSA_PKCS1_SHA384,
1528            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
1529            rustls::SignatureScheme::RSA_PKCS1_SHA512,
1530            rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
1531            rustls::SignatureScheme::RSA_PSS_SHA256,
1532            rustls::SignatureScheme::RSA_PSS_SHA384,
1533            rustls::SignatureScheme::RSA_PSS_SHA512,
1534            rustls::SignatureScheme::ED25519,
1535            rustls::SignatureScheme::ED448,
1536        ]
1537    }
1538}
1539
1540pub use pool::{ConnectionPool, ConnectionPoolConfig, EndpointHealth, HealthStatus, LoadBalancer};