Skip to main content

talos_api_rs/client/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3pub mod discovery;
4mod node_target;
5pub mod pool;
6#[cfg(test)]
7mod tests;
8
9pub use discovery::{ClusterDiscovery, ClusterHealth, ClusterMember, NodeHealth, NodeRole};
10pub use node_target::{NodeTarget, NODE_METADATA_KEY};
11
12use crate::api::machine::machine_service_client::MachineServiceClient;
13use crate::api::machine::ApplyConfigurationRequest as ProtoApplyConfigRequest;
14use crate::api::machine::BootstrapRequest as ProtoBootstrapRequest;
15use crate::api::machine::CopyRequest as ProtoCopyRequest;
16use crate::api::machine::DiskUsageRequest as ProtoDiskUsageRequest;
17use crate::api::machine::DmesgRequest as ProtoDmesgRequest;
18use crate::api::machine::EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest;
19use crate::api::machine::EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest;
20use crate::api::machine::EtcdMemberListRequest as ProtoEtcdMemberListRequest;
21use crate::api::machine::EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest;
22use crate::api::machine::GenerateClientConfigurationRequest as ProtoGenerateClientConfigRequest;
23use crate::api::machine::ImageListRequest as ProtoImageListRequest;
24use crate::api::machine::ImagePullRequest as ProtoImagePullRequest;
25use crate::api::machine::ListRequest as ProtoListRequest;
26use crate::api::machine::LogsRequest as ProtoLogsRequest;
27use crate::api::machine::NetstatRequest as ProtoNetstatRequest;
28use crate::api::machine::PacketCaptureRequest as ProtoPacketCaptureRequest;
29use crate::api::machine::ReadRequest as ProtoReadRequest;
30use crate::api::machine::ResetRequest as ProtoResetRequest;
31use crate::api::machine::RollbackRequest as ProtoRollbackRequest;
32use crate::api::machine::ServiceRestartRequest as ProtoServiceRestartRequest;
33use crate::api::machine::ServiceStartRequest as ProtoServiceStartRequest;
34use crate::api::machine::ServiceStopRequest as ProtoServiceStopRequest;
35use crate::api::machine::UpgradeRequest as ProtoUpgradeRequest;
36use crate::api::version::version_service_client::VersionServiceClient;
37use crate::error::Result;
38use crate::resources::{
39    ApplyConfigurationRequest, ApplyConfigurationResponse, BootstrapRequest, BootstrapResponse,
40    CopyRequest, CopyResponse, CpuInfoResponse, DiskStatsResponse, DiskUsageInfo, DiskUsageRequest,
41    DiskUsageResponse, DmesgRequest, DmesgResponse, EtcdAlarmDisarmResponse, EtcdAlarmListResponse,
42    EtcdDefragmentResponse, EtcdForfeitLeadershipRequest, EtcdForfeitLeadershipResponse,
43    EtcdLeaveClusterRequest, EtcdLeaveClusterResponse, EtcdMemberListRequest,
44    EtcdMemberListResponse, EtcdRemoveMemberByIdRequest, EtcdRemoveMemberByIdResponse,
45    EtcdStatusResponse, FileInfo, GenerateClientConfigurationRequest,
46    GenerateClientConfigurationResponse, ImageInfo, ImageListRequest, ImagePullRequest,
47    ImagePullResponse, KubeconfigResponse, ListRequest, ListResponse, LoadAvgResponse, LogsRequest,
48    LogsResponse, MemoryResponse, MountsResponse, NetstatRequest, NetstatResponse,
49    NetworkDeviceStatsResponse, PacketCaptureRequest, PacketCaptureResponse, ProcessesResponse,
50    ReadRequest, ReadResponse, ResetRequest, ResetResponse, RollbackResponse,
51    ServiceRestartRequest, ServiceRestartResponse, ServiceStartRequest, ServiceStartResponse,
52    ServiceStopRequest, ServiceStopResponse, UpgradeRequest, UpgradeResponse,
53};
54use hyper_util::rt::TokioIo;
55use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
56use std::sync::Arc;
57use std::time::Duration;
58use tonic::transport::{Channel, Endpoint};
59
60/// Configuration for the Talos API client.
61#[derive(Clone, Debug)]
62pub struct TalosClientConfig {
63    /// The gRPC endpoint URL.
64    pub endpoint: String,
65    /// Path to client certificate.
66    pub crt_path: Option<String>,
67    /// Path to client private key.
68    pub key_path: Option<String>,
69    /// Path to CA certificate.
70    pub ca_path: Option<String>,
71    /// If true, skips TLS verification (insecure).
72    pub insecure: bool,
73    /// Connection timeout for establishing the gRPC channel.
74    pub connect_timeout: Option<Duration>,
75    /// Request timeout for individual RPC calls.
76    pub request_timeout: Option<Duration>,
77    /// Keepalive interval for long-running connections.
78    pub keepalive_interval: Option<Duration>,
79    /// Keepalive timeout.
80    pub keepalive_timeout: Option<Duration>,
81}
82
83impl Default for TalosClientConfig {
84    fn default() -> Self {
85        Self {
86            endpoint: "https://127.0.0.1:50000".to_string(),
87            crt_path: None,
88            key_path: None,
89            ca_path: None,
90            insecure: false,
91            connect_timeout: Some(Duration::from_secs(10)),
92            request_timeout: Some(Duration::from_secs(30)),
93            keepalive_interval: Some(Duration::from_secs(30)),
94            keepalive_timeout: Some(Duration::from_secs(10)),
95        }
96    }
97}
98
99impl TalosClientConfig {
100    /// Create a new configuration with an endpoint.
101    #[must_use]
102    pub fn new(endpoint: impl Into<String>) -> Self {
103        Self {
104            endpoint: endpoint.into(),
105            ..Default::default()
106        }
107    }
108
109    /// Create a builder for more complex configuration.
110    #[must_use]
111    pub fn builder(endpoint: impl Into<String>) -> TalosClientConfigBuilder {
112        TalosClientConfigBuilder::new(endpoint)
113    }
114
115    /// Set client certificate path.
116    #[must_use]
117    pub fn with_client_cert(mut self, crt_path: impl Into<String>) -> Self {
118        self.crt_path = Some(crt_path.into());
119        self
120    }
121
122    /// Set client key path.
123    #[must_use]
124    pub fn with_client_key(mut self, key_path: impl Into<String>) -> Self {
125        self.key_path = Some(key_path.into());
126        self
127    }
128
129    /// Set CA certificate path.
130    #[must_use]
131    pub fn with_ca(mut self, ca_path: impl Into<String>) -> Self {
132        self.ca_path = Some(ca_path.into());
133        self
134    }
135
136    /// Enable insecure mode (skip TLS verification).
137    #[must_use]
138    pub fn insecure(mut self) -> Self {
139        self.insecure = true;
140        self
141    }
142
143    /// Set connect timeout.
144    #[must_use]
145    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
146        self.connect_timeout = Some(timeout);
147        self
148    }
149
150    /// Set request timeout.
151    #[must_use]
152    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
153        self.request_timeout = Some(timeout);
154        self
155    }
156
157    /// Disable all timeouts.
158    #[must_use]
159    pub fn no_timeout(mut self) -> Self {
160        self.connect_timeout = None;
161        self.request_timeout = None;
162        self
163    }
164}
165
166/// Builder for `TalosClientConfig`.
167#[derive(Debug, Clone)]
168pub struct TalosClientConfigBuilder {
169    endpoint: String,
170    crt_path: Option<String>,
171    key_path: Option<String>,
172    ca_path: Option<String>,
173    insecure: bool,
174    connect_timeout: Option<Duration>,
175    request_timeout: Option<Duration>,
176    keepalive_interval: Option<Duration>,
177    keepalive_timeout: Option<Duration>,
178}
179
180impl TalosClientConfigBuilder {
181    /// Create a new builder.
182    #[must_use]
183    pub fn new(endpoint: impl Into<String>) -> Self {
184        Self {
185            endpoint: endpoint.into(),
186            crt_path: None,
187            key_path: None,
188            ca_path: None,
189            insecure: false,
190            connect_timeout: Some(Duration::from_secs(10)),
191            request_timeout: Some(Duration::from_secs(30)),
192            keepalive_interval: Some(Duration::from_secs(30)),
193            keepalive_timeout: Some(Duration::from_secs(10)),
194        }
195    }
196
197    /// Set client certificate path.
198    #[must_use]
199    pub fn client_cert(mut self, path: impl Into<String>) -> Self {
200        self.crt_path = Some(path.into());
201        self
202    }
203
204    /// Set client key path.
205    #[must_use]
206    pub fn client_key(mut self, path: impl Into<String>) -> Self {
207        self.key_path = Some(path.into());
208        self
209    }
210
211    /// Set CA certificate path.
212    #[must_use]
213    pub fn ca_cert(mut self, path: impl Into<String>) -> Self {
214        self.ca_path = Some(path.into());
215        self
216    }
217
218    /// Enable insecure mode.
219    #[must_use]
220    pub fn insecure(mut self) -> Self {
221        self.insecure = true;
222        self
223    }
224
225    /// Set connect timeout.
226    #[must_use]
227    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
228        self.connect_timeout = Some(timeout);
229        self
230    }
231
232    /// Set request timeout.
233    #[must_use]
234    pub fn request_timeout(mut self, timeout: Duration) -> Self {
235        self.request_timeout = Some(timeout);
236        self
237    }
238
239    /// Set keepalive settings.
240    #[must_use]
241    pub fn keepalive(mut self, interval: Duration, timeout: Duration) -> Self {
242        self.keepalive_interval = Some(interval);
243        self.keepalive_timeout = Some(timeout);
244        self
245    }
246
247    /// Disable timeouts.
248    #[must_use]
249    pub fn no_timeout(mut self) -> Self {
250        self.connect_timeout = None;
251        self.request_timeout = None;
252        self
253    }
254
255    /// Build the configuration.
256    #[must_use]
257    pub fn build(self) -> TalosClientConfig {
258        TalosClientConfig {
259            endpoint: self.endpoint,
260            crt_path: self.crt_path,
261            key_path: self.key_path,
262            ca_path: self.ca_path,
263            insecure: self.insecure,
264            connect_timeout: self.connect_timeout,
265            request_timeout: self.request_timeout,
266            keepalive_interval: self.keepalive_interval,
267            keepalive_timeout: self.keepalive_timeout,
268        }
269    }
270}
271
272#[derive(Clone)]
273pub struct TalosClient {
274    /// Client configuration (retained for debugging and introspection)
275    #[allow(dead_code)]
276    config: TalosClientConfig,
277    channel: Channel,
278    /// Current node target for API calls
279    node_target: NodeTarget,
280}
281
282impl TalosClient {
283    pub async fn new(config: TalosClientConfig) -> Result<Self> {
284        // Install ring as default crypto provider (supports ED25519)
285        let _ = rustls::crypto::ring::default_provider().install_default();
286
287        // Check if using plain HTTP (no TLS)
288        let is_http = config.endpoint.starts_with("http://");
289
290        let channel = if is_http {
291            // Plain HTTP - no TLS at all
292            Self::create_http_channel(&config).await?
293        } else if config.insecure {
294            Self::create_insecure_channel(&config).await?
295        } else {
296            Self::create_mtls_channel(&config).await?
297        };
298
299        Ok(Self {
300            config,
301            channel,
302            node_target: NodeTarget::Default,
303        })
304    }
305
306    /// Create a client from a TalosConfig context
307    ///
308    /// This loads credentials from the talosconfig and connects to the first endpoint.
309    ///
310    /// # Example
311    ///
312    /// ```no_run
313    /// use talos_api_rs::{TalosClient, config::TalosConfig};
314    ///
315    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
316    /// let config = TalosConfig::load_with_env()?;
317    /// let client = TalosClient::from_talosconfig(&config, None).await?;
318    /// # Ok(())
319    /// # }
320    /// ```
321    ///
322    /// # Arguments
323    ///
324    /// * `config` - The loaded TalosConfig
325    /// * `context_name` - Optional context name to use (defaults to active context)
326    pub async fn from_talosconfig(
327        config: &crate::config::TalosConfig,
328        context_name: Option<&str>,
329    ) -> Result<Self> {
330        let context = if let Some(name) = context_name {
331            config.get_context(name).ok_or_else(|| {
332                crate::error::TalosError::Config(format!("Context '{}' not found", name))
333            })?
334        } else {
335            config.active_context().ok_or_else(|| {
336                crate::error::TalosError::Config("No active context in talosconfig".to_string())
337            })?
338        };
339
340        let endpoint = context.first_endpoint().ok_or_else(|| {
341            crate::error::TalosError::Config("No endpoints in context".to_string())
342        })?;
343
344        // Build endpoint URL
345        let endpoint_url = if endpoint.contains("://") {
346            endpoint.clone()
347        } else if endpoint.contains(':') {
348            format!("https://{}", endpoint)
349        } else {
350            format!("https://{}:50000", endpoint)
351        };
352
353        // Build client config
354        let mut client_config = TalosClientConfig::new(&endpoint_url);
355
356        // Write certs to temp files if provided inline
357        if let (Some(ca), Some(crt), Some(key)) = (&context.ca, &context.crt, &context.key) {
358            let temp_dir = std::env::temp_dir().join("talos-api-rs");
359            std::fs::create_dir_all(&temp_dir).map_err(|e| {
360                crate::error::TalosError::Config(format!("Failed to create temp dir: {}", e))
361            })?;
362
363            let ca_path = temp_dir.join("ca.crt");
364            let crt_path = temp_dir.join("client.crt");
365            let key_path = temp_dir.join("client.key");
366
367            std::fs::write(&ca_path, ca).map_err(|e| {
368                crate::error::TalosError::Config(format!("Failed to write CA cert: {}", e))
369            })?;
370            std::fs::write(&crt_path, crt).map_err(|e| {
371                crate::error::TalosError::Config(format!("Failed to write client cert: {}", e))
372            })?;
373            std::fs::write(&key_path, key).map_err(|e| {
374                crate::error::TalosError::Config(format!("Failed to write client key: {}", e))
375            })?;
376
377            client_config = client_config
378                .with_ca(ca_path.to_string_lossy().to_string())
379                .with_client_cert(crt_path.to_string_lossy().to_string())
380                .with_client_key(key_path.to_string_lossy().to_string());
381        }
382
383        let mut client = Self::new(client_config).await?;
384
385        // Set node target from context if available
386        if let Some(nodes) = &context.nodes {
387            if !nodes.is_empty() {
388                client.node_target = NodeTarget::from(nodes.clone());
389            }
390        }
391
392        Ok(client)
393    }
394
395    /// Create a new client targeting a specific node
396    ///
397    /// # Example
398    ///
399    /// ```ignore
400    /// use talos_api_rs::{TalosClient, TalosClientConfig};
401    /// use talos_api_rs::client::NodeTarget;
402    ///
403    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
404    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
405    ///
406    /// // Target a specific node for API calls
407    /// let targeted = client.with_node(NodeTarget::single("192.168.1.10"));
408    /// let hostname = targeted.hostname().await?;
409    /// # Ok(())
410    /// # }
411    /// ```
412    #[must_use]
413    pub fn with_node(&self, target: NodeTarget) -> Self {
414        Self {
415            config: self.config.clone(),
416            channel: self.channel.clone(),
417            node_target: target,
418        }
419    }
420
421    /// Create a new client targeting multiple nodes
422    ///
423    /// Convenience method for cluster-wide operations.
424    #[must_use]
425    pub fn with_nodes(&self, nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
426        self.with_node(NodeTarget::multiple(nodes))
427    }
428
429    /// Get the current node target
430    #[must_use]
431    pub fn node_target(&self) -> &NodeTarget {
432        &self.node_target
433    }
434
435    /// Create a plain HTTP channel (no TLS)
436    async fn create_http_channel(config: &TalosClientConfig) -> Result<Channel> {
437        let mut endpoint = Channel::from_shared(config.endpoint.clone())
438            .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
439
440        // Apply timeout configuration
441        if let Some(timeout) = config.connect_timeout {
442            endpoint = endpoint.connect_timeout(timeout);
443        }
444        if let Some(timeout) = config.request_timeout {
445            endpoint = endpoint.timeout(timeout);
446        }
447        if let Some(interval) = config.keepalive_interval {
448            if let Some(ka_timeout) = config.keepalive_timeout {
449                endpoint = endpoint
450                    .http2_keep_alive_interval(interval)
451                    .keep_alive_timeout(ka_timeout);
452            }
453        }
454
455        let channel = endpoint.connect().await?;
456        Ok(channel)
457    }
458
459    /// Create an insecure channel (TLS without certificate verification)
460    async fn create_insecure_channel(config: &TalosClientConfig) -> Result<Channel> {
461        let tls_config = rustls::ClientConfig::builder()
462            .with_root_certificates(rustls::RootCertStore::empty())
463            .with_no_client_auth();
464
465        Self::connect_with_custom_tls(config, tls_config, true).await
466    }
467
468    /// Create an mTLS channel with full certificate verification
469    async fn create_mtls_channel(config: &TalosClientConfig) -> Result<Channel> {
470        // Load CA certificate
471        let root_store = if let Some(ca_path) = &config.ca_path {
472            let ca_pem = std::fs::read(ca_path).map_err(|e| {
473                crate::error::TalosError::Config(format!("Failed to read CA cert: {e}"))
474            })?;
475            let mut root_store = rustls::RootCertStore::empty();
476            let certs = Self::load_pem_certs(&ca_pem)?;
477            for cert in certs {
478                root_store.add(cert).map_err(|e| {
479                    crate::error::TalosError::Config(format!("Failed to add CA cert: {e}"))
480                })?;
481            }
482            root_store
483        } else {
484            // Use system roots if no CA provided
485            let mut root_store = rustls::RootCertStore::empty();
486            root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
487            root_store
488        };
489
490        // Build TLS config with or without client auth
491        let tls_config =
492            if let (Some(crt_path), Some(key_path)) = (&config.crt_path, &config.key_path) {
493                // mTLS with client certificate
494                let cert_pem = std::fs::read(crt_path).map_err(|e| {
495                    crate::error::TalosError::Config(format!("Failed to read client cert: {e}"))
496                })?;
497                let key_pem = std::fs::read(key_path).map_err(|e| {
498                    crate::error::TalosError::Config(format!("Failed to read client key: {e}"))
499                })?;
500
501                let client_certs = Self::load_pem_certs(&cert_pem)?;
502                let client_key = Self::load_pem_key(&key_pem)?;
503
504                rustls::ClientConfig::builder()
505                    .with_root_certificates(root_store)
506                    .with_client_auth_cert(client_certs, client_key)
507                    .map_err(|e| {
508                        crate::error::TalosError::Config(format!(
509                            "Failed to configure client auth: {e}"
510                        ))
511                    })?
512            } else {
513                // TLS without client auth
514                rustls::ClientConfig::builder()
515                    .with_root_certificates(root_store)
516                    .with_no_client_auth()
517            };
518
519        Self::connect_with_custom_tls(config, tls_config, false).await
520    }
521
522    /// Connect using a custom rustls TLS configuration
523    async fn connect_with_custom_tls(
524        config: &TalosClientConfig,
525        mut tls_config: rustls::ClientConfig,
526        skip_verification: bool,
527    ) -> Result<Channel> {
528        // Override verifier for insecure mode
529        if skip_verification {
530            tls_config
531                .dangerous()
532                .set_certificate_verifier(Arc::new(NoVerifier));
533        }
534
535        // gRPC requires ALPN h2
536        tls_config.alpn_protocols = vec![b"h2".to_vec()];
537        let tls_config = Arc::new(tls_config);
538        let connector = tokio_rustls::TlsConnector::from(tls_config);
539
540        // Extract host for SNI
541        let endpoint_url = if config.endpoint.starts_with("http") {
542            config.endpoint.clone()
543        } else {
544            format!("https://{}", config.endpoint)
545        };
546        let parsed_url = url::Url::parse(&endpoint_url)
547            .map_err(|e| crate::error::TalosError::Config(format!("Invalid endpoint URL: {e}")))?;
548        let host = parsed_url
549            .host_str()
550            .ok_or_else(|| crate::error::TalosError::Config("No host in endpoint".to_string()))?
551            .to_string();
552        let port = parsed_url.port().unwrap_or(50000);
553
554        // For custom connector, use http:// scheme (we handle TLS ourselves)
555        let endpoint_for_connector = format!("http://{}:{}", host, port);
556
557        // Build endpoint with timeout configuration
558        let mut endpoint = Endpoint::from_shared(endpoint_for_connector)
559            .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
560
561        // Apply timeout configuration
562        if let Some(timeout) = config.connect_timeout {
563            endpoint = endpoint.connect_timeout(timeout);
564        }
565        if let Some(timeout) = config.request_timeout {
566            endpoint = endpoint.timeout(timeout);
567        }
568        if let Some(interval) = config.keepalive_interval {
569            if let Some(ka_timeout) = config.keepalive_timeout {
570                endpoint = endpoint
571                    .http2_keep_alive_interval(interval)
572                    .keep_alive_timeout(ka_timeout);
573            }
574        }
575
576        let channel = endpoint
577            .connect_with_connector(tower::service_fn(move |uri: tonic::transport::Uri| {
578                let connector = connector.clone();
579                let host = host.clone();
580                async move {
581                    let uri_host = uri.host().unwrap_or("127.0.0.1");
582                    let uri_port = uri.port_u16().unwrap_or(50000);
583                    let addr = format!("{}:{}", uri_host, uri_port);
584
585                    let tcp = tokio::net::TcpStream::connect(addr).await?;
586
587                    // Use actual hostname for SNI (important for cert verification)
588                    let server_name = ServerName::try_from(host.clone())
589                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
590
591                    let tls_stream = connector.connect(server_name, tcp).await?;
592                    Ok::<_, std::io::Error>(TokioIo::new(tls_stream))
593                }
594            }))
595            .await?;
596
597        Ok(channel)
598    }
599
600    /// Load PEM-encoded certificates
601    #[allow(clippy::result_large_err)]
602    fn load_pem_certs(pem_data: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
603        let mut reader = std::io::BufReader::new(pem_data);
604        let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
605            .collect::<std::result::Result<Vec<_>, _>>()
606            .map_err(|e| {
607                crate::error::TalosError::Config(format!("Failed to parse PEM certificates: {e}"))
608            })?;
609        if certs.is_empty() {
610            return Err(crate::error::TalosError::Config(
611                "No certificates found in PEM data".to_string(),
612            ));
613        }
614        Ok(certs)
615    }
616
617    /// Load PEM-encoded private key (supports RSA, EC, PKCS8, and ED25519)
618    #[allow(clippy::result_large_err)]
619    fn load_pem_key(pem_data: &[u8]) -> Result<PrivateKeyDer<'static>> {
620        // First, try standard PEM formats via rustls_pemfile
621        let mut reader = std::io::BufReader::new(pem_data);
622
623        loop {
624            match rustls_pemfile::read_one(&mut reader) {
625                Ok(Some(rustls_pemfile::Item::Pkcs1Key(key))) => {
626                    return Ok(PrivateKeyDer::Pkcs1(key));
627                }
628                Ok(Some(rustls_pemfile::Item::Pkcs8Key(key))) => {
629                    return Ok(PrivateKeyDer::Pkcs8(key));
630                }
631                Ok(Some(rustls_pemfile::Item::Sec1Key(key))) => {
632                    return Ok(PrivateKeyDer::Sec1(key));
633                }
634                Ok(Some(_)) => {
635                    // Skip other PEM items (certificates, etc.)
636                    continue;
637                }
638                Ok(None) => {
639                    break;
640                }
641                Err(e) => {
642                    return Err(crate::error::TalosError::Config(format!(
643                        "Failed to parse PEM key: {e}"
644                    )));
645                }
646            }
647        }
648
649        // Fallback: Handle non-standard "ED25519 PRIVATE KEY" PEM label
650        // Talos uses this format, which is PKCS#8-encoded but with a custom label
651        let pem_str = std::str::from_utf8(pem_data)
652            .map_err(|e| crate::error::TalosError::Config(format!("Invalid UTF-8 in key: {e}")))?;
653
654        if pem_str.contains("-----BEGIN ED25519 PRIVATE KEY-----") {
655            // Extract the base64 content between the headers
656            let start_marker = "-----BEGIN ED25519 PRIVATE KEY-----";
657            let end_marker = "-----END ED25519 PRIVATE KEY-----";
658
659            if let Some(start) = pem_str.find(start_marker) {
660                if let Some(end) = pem_str.find(end_marker) {
661                    let base64_content = &pem_str[start + start_marker.len()..end];
662                    let base64_clean: String = base64_content
663                        .chars()
664                        .filter(|c| !c.is_whitespace())
665                        .collect();
666
667                    let der_bytes = base64::Engine::decode(
668                        &base64::engine::general_purpose::STANDARD,
669                        &base64_clean,
670                    )
671                    .map_err(|e| {
672                        crate::error::TalosError::Config(format!(
673                            "Failed to decode ED25519 key: {e}"
674                        ))
675                    })?;
676
677                    // ED25519 PRIVATE KEY is actually PKCS#8 encoded
678                    return Ok(PrivateKeyDer::Pkcs8(
679                        rustls::pki_types::PrivatePkcs8KeyDer::from(der_bytes),
680                    ));
681                }
682            }
683        }
684
685        Err(crate::error::TalosError::Config(
686            "No private key found in PEM data".to_string(),
687        ))
688    }
689
690    /// Access the Version API group
691    pub fn version(&self) -> VersionServiceClient<Channel> {
692        VersionServiceClient::new(self.channel.clone())
693    }
694
695    /// Access the Machine API group
696    pub fn machine(&self) -> MachineServiceClient<Channel> {
697        MachineServiceClient::new(self.channel.clone())
698    }
699
700    /// Create a gRPC request with node targeting applied
701    fn make_request<T>(&self, inner: T) -> tonic::Request<T> {
702        self.node_target
703            .apply_to_request(tonic::Request::new(inner))
704    }
705
706    // ========================================================================
707    // High-level convenience methods
708    // ========================================================================
709
710    /// Apply a configuration to the node.
711    ///
712    /// This is a high-level wrapper around the `MachineService::ApplyConfiguration` RPC.
713    ///
714    /// # Example
715    ///
716    /// ```ignore
717    /// use talos_api_rs::{TalosClient, TalosClientConfig, ApplyConfigurationRequest, ApplyMode};
718    ///
719    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
720    /// let client = TalosClient::new(TalosClientConfig {
721    ///     endpoint: "https://192.168.1.100:50000".to_string(),
722    ///     insecure: true,
723    ///     ..Default::default()
724    /// }).await?;
725    ///
726    /// // Apply configuration in dry-run mode
727    /// let request = ApplyConfigurationRequest::builder()
728    ///     .config_yaml("machine:\n  type: worker")
729    ///     .mode(ApplyMode::NoReboot)
730    ///     .dry_run(true)
731    ///     .build();
732    ///
733    /// let response = client.apply_configuration(request).await?;
734    /// println!("Warnings: {:?}", response.all_warnings());
735    /// # Ok(())
736    /// # }
737    /// ```
738    ///
739    /// # Errors
740    ///
741    /// Returns an error if the RPC call fails or the configuration is invalid.
742    pub async fn apply_configuration(
743        &self,
744        request: ApplyConfigurationRequest,
745    ) -> Result<ApplyConfigurationResponse> {
746        let proto_request: ProtoApplyConfigRequest = request.into();
747        let grpc_request = self.make_request(proto_request);
748        let response = self
749            .machine()
750            .apply_configuration(grpc_request)
751            .await?
752            .into_inner();
753        Ok(response.into())
754    }
755
756    /// Apply a YAML configuration string to the node.
757    ///
758    /// Convenience method for simple configuration application.
759    ///
760    /// # Example
761    ///
762    /// ```ignore
763    /// use talos_api_rs::{TalosClient, TalosClientConfig, ApplyMode};
764    ///
765    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
766    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
767    /// let config_yaml = std::fs::read_to_string("machine.yaml")?;
768    /// let response = client.apply_configuration_yaml(&config_yaml, ApplyMode::Auto, false).await?;
769    /// # Ok(())
770    /// # }
771    /// ```
772    pub async fn apply_configuration_yaml(
773        &self,
774        yaml: &str,
775        mode: crate::ApplyMode,
776        dry_run: bool,
777    ) -> Result<ApplyConfigurationResponse> {
778        let request = ApplyConfigurationRequest::builder()
779            .config_yaml(yaml)
780            .mode(mode)
781            .dry_run(dry_run)
782            .build();
783        self.apply_configuration(request).await
784    }
785
786    /// Bootstrap the etcd cluster on this node.
787    ///
788    /// This initializes a new etcd cluster. **This should only be called ONCE**
789    /// on the first control-plane node when creating a new Talos cluster.
790    ///
791    /// # Example
792    ///
793    /// ```ignore
794    /// use talos_api_rs::{TalosClient, TalosClientConfig, BootstrapRequest};
795    ///
796    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
797    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
798    ///
799    /// // Bootstrap a new cluster
800    /// let response = client.bootstrap(BootstrapRequest::new()).await?;
801    /// println!("Bootstrap complete: {:?}", response.first());
802    /// # Ok(())
803    /// # }
804    /// ```
805    ///
806    /// # Recovery
807    ///
808    /// To recover from an etcd snapshot (uploaded via `EtcdRecover` RPC):
809    ///
810    /// ```ignore
811    /// use talos_api_rs::{TalosClient, TalosClientConfig, BootstrapRequest};
812    ///
813    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
814    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
815    /// let response = client.bootstrap(BootstrapRequest::recovery()).await?;
816    /// # Ok(())
817    /// # }
818    /// ```
819    ///
820    /// # Errors
821    ///
822    /// Returns an error if:
823    /// - The node is not a control-plane node
824    /// - etcd is already bootstrapped
825    /// - Network/connection issues
826    pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<BootstrapResponse> {
827        let proto_request: ProtoBootstrapRequest = request.into();
828        let grpc_request = self.make_request(proto_request);
829        let response = self.machine().bootstrap(grpc_request).await?.into_inner();
830        Ok(response.into())
831    }
832
833    /// Bootstrap a new etcd cluster (convenience method).
834    ///
835    /// Equivalent to `bootstrap(BootstrapRequest::new())`.
836    pub async fn bootstrap_cluster(&self) -> Result<BootstrapResponse> {
837        self.bootstrap(BootstrapRequest::new()).await
838    }
839
840    /// Retrieve the kubeconfig from the cluster.
841    ///
842    /// This is a server-streaming RPC that retrieves the kubeconfig file
843    /// from a control-plane node. The kubeconfig can be used to access
844    /// the Kubernetes API of the cluster.
845    ///
846    /// # Example
847    ///
848    /// ```ignore
849    /// use talos_api_rs::{TalosClient, TalosClientConfig};
850    ///
851    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
852    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
853    ///
854    /// // Get kubeconfig
855    /// let kubeconfig = client.kubeconfig().await?;
856    /// println!("Kubeconfig from node: {:?}", kubeconfig.node);
857    ///
858    /// // Write to file
859    /// kubeconfig.write_to_file("kubeconfig.yaml")?;
860    ///
861    /// // Or get as string
862    /// let yaml = kubeconfig.as_str()?;
863    /// println!("{}", yaml);
864    /// # Ok(())
865    /// # }
866    /// ```
867    ///
868    /// # Errors
869    ///
870    /// Returns an error if:
871    /// - The node is not a control-plane node
872    /// - The cluster is not yet bootstrapped
873    /// - Network/connection issues
874    pub async fn kubeconfig(&self) -> Result<KubeconfigResponse> {
875        use tonic::codegen::tokio_stream::StreamExt;
876
877        let mut stream = self.machine().kubeconfig(()).await?.into_inner();
878
879        let mut data = Vec::new();
880        let mut node = None;
881
882        while let Some(chunk) = stream.next().await {
883            let chunk = chunk?;
884            // Capture node from first chunk with metadata
885            if node.is_none() {
886                if let Some(metadata) = &chunk.metadata {
887                    node = Some(metadata.hostname.clone());
888                }
889            }
890            data.extend(chunk.bytes);
891        }
892
893        Ok(KubeconfigResponse::new(data, node))
894    }
895
896    /// Reset a Talos node, optionally wiping disks.
897    ///
898    /// # Warning
899    ///
900    /// This is a **destructive** operation. The node will be reset and may
901    /// lose all data depending on the wipe mode configured.
902    ///
903    /// # Arguments
904    ///
905    /// * `request` - The reset request configuration
906    ///
907    /// # Example
908    ///
909    /// ```ignore
910    /// use talos_api_rs::{TalosClient, TalosClientConfig, ResetRequest};
911    ///
912    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
913    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
914    /// let client = TalosClient::new(config).await?;
915    ///
916    /// // Graceful reset (leaves etcd cluster first)
917    /// let response = client.reset(ResetRequest::graceful()).await?;
918    ///
919    /// // Force reset with full disk wipe
920    /// let response = client.reset(ResetRequest::force()).await?;
921    ///
922    /// // Custom reset
923    /// let response = client.reset(
924    ///     ResetRequest::builder()
925    ///         .graceful(true)
926    ///         .reboot(true)
927    ///         .build()
928    /// ).await?;
929    /// # Ok(())
930    /// # }
931    /// ```
932    pub async fn reset(&self, request: ResetRequest) -> Result<ResetResponse> {
933        let mut client = MachineServiceClient::new(self.channel.clone());
934
935        let proto_request: ProtoResetRequest = request.into();
936        let response = client.reset(proto_request).await?;
937        let inner = response.into_inner();
938
939        Ok(ResetResponse::from(inner))
940    }
941
942    /// Gracefully reset a Talos node.
943    ///
944    /// This is a convenience method that performs a graceful reset, which:
945    /// - Leaves the etcd cluster gracefully (for control plane nodes)
946    /// - Reboots after reset
947    /// - Does not wipe disks
948    ///
949    /// For more control, use [`reset`](Self::reset) with a custom [`ResetRequest`].
950    pub async fn reset_graceful(&self) -> Result<ResetResponse> {
951        self.reset(ResetRequest::graceful()).await
952    }
953
954    // =========================================================================
955    // etcd Operations
956    // =========================================================================
957
958    /// List etcd cluster members.
959    ///
960    /// # Example
961    ///
962    /// ```ignore
963    /// use talos_api_rs::{TalosClient, TalosClientConfig, EtcdMemberListRequest};
964    ///
965    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
966    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
967    /// let client = TalosClient::new(config).await?;
968    ///
969    /// let response = client.etcd_member_list(EtcdMemberListRequest::new()).await?;
970    /// for member in response.all_members() {
971    ///     println!("{}: {}", member.id, member.hostname);
972    /// }
973    /// # Ok(())
974    /// # }
975    /// ```
976    pub async fn etcd_member_list(
977        &self,
978        request: EtcdMemberListRequest,
979    ) -> Result<EtcdMemberListResponse> {
980        let mut client = MachineServiceClient::new(self.channel.clone());
981
982        let proto_request: ProtoEtcdMemberListRequest = request.into();
983        let response = client.etcd_member_list(proto_request).await?;
984        let inner = response.into_inner();
985
986        Ok(EtcdMemberListResponse::from(inner))
987    }
988
989    /// Remove an etcd member by ID.
990    ///
991    /// Use this to remove members that no longer have an associated Talos node.
992    /// For nodes that are still running, use [`etcd_leave_cluster`](Self::etcd_leave_cluster).
993    ///
994    /// # Example
995    ///
996    /// ```ignore
997    /// use talos_api_rs::{TalosClient, TalosClientConfig, EtcdRemoveMemberByIdRequest};
998    ///
999    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1000    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
1001    /// let client = TalosClient::new(config).await?;
1002    ///
1003    /// // First, find the member ID
1004    /// let members = client.etcd_member_list(Default::default()).await?;
1005    /// if let Some(member) = members.find_by_hostname("old-node") {
1006    ///     client.etcd_remove_member_by_id(
1007    ///         EtcdRemoveMemberByIdRequest::new(member.id)
1008    ///     ).await?;
1009    /// }
1010    /// # Ok(())
1011    /// # }
1012    /// ```
1013    pub async fn etcd_remove_member_by_id(
1014        &self,
1015        request: EtcdRemoveMemberByIdRequest,
1016    ) -> Result<EtcdRemoveMemberByIdResponse> {
1017        let mut client = MachineServiceClient::new(self.channel.clone());
1018
1019        let proto_request: ProtoEtcdRemoveMemberByIdRequest = request.into();
1020        let response = client.etcd_remove_member_by_id(proto_request).await?;
1021        let inner = response.into_inner();
1022
1023        Ok(EtcdRemoveMemberByIdResponse::from(inner))
1024    }
1025
1026    /// Make a node leave the etcd cluster gracefully.
1027    ///
1028    /// This should be called on the node that is being removed.
1029    pub async fn etcd_leave_cluster(
1030        &self,
1031        request: EtcdLeaveClusterRequest,
1032    ) -> Result<EtcdLeaveClusterResponse> {
1033        let mut client = MachineServiceClient::new(self.channel.clone());
1034
1035        let proto_request: ProtoEtcdLeaveClusterRequest = request.into();
1036        let response = client.etcd_leave_cluster(proto_request).await?;
1037        let inner = response.into_inner();
1038
1039        Ok(EtcdLeaveClusterResponse::from(inner))
1040    }
1041
1042    /// Forfeit etcd leadership.
1043    ///
1044    /// Causes the current leader to step down and trigger a new election.
1045    pub async fn etcd_forfeit_leadership(
1046        &self,
1047        request: EtcdForfeitLeadershipRequest,
1048    ) -> Result<EtcdForfeitLeadershipResponse> {
1049        let mut client = MachineServiceClient::new(self.channel.clone());
1050
1051        let proto_request: ProtoEtcdForfeitLeadershipRequest = request.into();
1052        let response = client.etcd_forfeit_leadership(proto_request).await?;
1053        let inner = response.into_inner();
1054
1055        Ok(EtcdForfeitLeadershipResponse::from(inner))
1056    }
1057
1058    /// Get etcd status for the current member.
1059    pub async fn etcd_status(&self) -> Result<EtcdStatusResponse> {
1060        let mut client = MachineServiceClient::new(self.channel.clone());
1061
1062        let response = client.etcd_status(()).await?;
1063        let inner = response.into_inner();
1064
1065        Ok(EtcdStatusResponse::from(inner))
1066    }
1067
1068    /// List etcd alarms.
1069    pub async fn etcd_alarm_list(&self) -> Result<EtcdAlarmListResponse> {
1070        let mut client = MachineServiceClient::new(self.channel.clone());
1071
1072        let response = client.etcd_alarm_list(()).await?;
1073        let inner = response.into_inner();
1074
1075        Ok(EtcdAlarmListResponse::from(inner))
1076    }
1077
1078    /// Disarm etcd alarms.
1079    pub async fn etcd_alarm_disarm(&self) -> Result<EtcdAlarmDisarmResponse> {
1080        let mut client = MachineServiceClient::new(self.channel.clone());
1081
1082        let response = client.etcd_alarm_disarm(()).await?;
1083        let inner = response.into_inner();
1084
1085        Ok(EtcdAlarmDisarmResponse::from(inner))
1086    }
1087
1088    /// Defragment etcd storage.
1089    ///
1090    /// **Warning**: This is a resource-heavy operation.
1091    pub async fn etcd_defragment(&self) -> Result<EtcdDefragmentResponse> {
1092        let mut client = MachineServiceClient::new(self.channel.clone());
1093
1094        let response = client.etcd_defragment(()).await?;
1095        let inner = response.into_inner();
1096
1097        Ok(EtcdDefragmentResponse::from(inner))
1098    }
1099
1100    // =========================================================================
1101    // Diagnostics
1102    // =========================================================================
1103
1104    /// Get kernel message buffer (dmesg).
1105    ///
1106    /// This is a server-streaming RPC that returns kernel messages.
1107    ///
1108    /// # Example
1109    ///
1110    /// ```ignore
1111    /// use talos_api_rs::{TalosClient, TalosClientConfig, DmesgRequest};
1112    ///
1113    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1114    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
1115    /// let client = TalosClient::new(config).await?;
1116    ///
1117    /// let dmesg = client.dmesg(DmesgRequest::new()).await?;
1118    /// println!("{}", dmesg.as_string_lossy());
1119    /// # Ok(())
1120    /// # }
1121    /// ```
1122    pub async fn dmesg(&self, request: DmesgRequest) -> Result<DmesgResponse> {
1123        use tonic::codegen::tokio_stream::StreamExt;
1124
1125        let mut client = MachineServiceClient::new(self.channel.clone());
1126
1127        let proto_request: ProtoDmesgRequest = request.into();
1128        let response = client.dmesg(proto_request).await?;
1129        let mut stream = response.into_inner();
1130
1131        let mut data = Vec::new();
1132        let mut node = None;
1133
1134        while let Some(chunk) = stream.next().await {
1135            let chunk = chunk?;
1136            if node.is_none() {
1137                if let Some(metadata) = &chunk.metadata {
1138                    node = Some(metadata.hostname.clone());
1139                }
1140            }
1141            data.extend(chunk.bytes);
1142        }
1143
1144        Ok(DmesgResponse::new(data, node))
1145    }
1146
1147    // =========================================================================
1148    // Upgrade
1149    // =========================================================================
1150
1151    /// Upgrade a Talos node to a new version.
1152    ///
1153    /// # Example
1154    ///
1155    /// ```ignore
1156    /// use talos_api_rs::{TalosClient, TalosClientConfig, UpgradeRequest};
1157    ///
1158    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1159    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
1160    /// let client = TalosClient::new(config).await?;
1161    ///
1162    /// // Upgrade to a specific version
1163    /// let response = client.upgrade(
1164    ///     UpgradeRequest::new("ghcr.io/siderolabs/installer:v1.6.0")
1165    /// ).await?;
1166    ///
1167    /// // Staged upgrade (downloads but doesn't apply until reboot)
1168    /// let response = client.upgrade(
1169    ///     UpgradeRequest::builder("ghcr.io/siderolabs/installer:v1.6.0")
1170    ///         .stage(true)
1171    ///         .preserve(true)
1172    ///         .build()
1173    /// ).await?;
1174    /// # Ok(())
1175    /// # }
1176    /// ```
1177    pub async fn upgrade(&self, request: UpgradeRequest) -> Result<UpgradeResponse> {
1178        let mut client = MachineServiceClient::new(self.channel.clone());
1179
1180        let proto_request: ProtoUpgradeRequest = request.into();
1181        let response = client.upgrade(proto_request).await?;
1182        let inner = response.into_inner();
1183
1184        Ok(UpgradeResponse::from(inner))
1185    }
1186
1187    // =========================================================================
1188    // Service Management
1189    // =========================================================================
1190
1191    /// Start a service.
1192    pub async fn service_start(
1193        &self,
1194        request: ServiceStartRequest,
1195    ) -> Result<ServiceStartResponse> {
1196        let mut client = MachineServiceClient::new(self.channel.clone());
1197
1198        let proto_request: ProtoServiceStartRequest = request.into();
1199        let response = client.service_start(proto_request).await?;
1200        let inner = response.into_inner();
1201
1202        Ok(ServiceStartResponse::from(inner))
1203    }
1204
1205    /// Stop a service.
1206    pub async fn service_stop(&self, request: ServiceStopRequest) -> Result<ServiceStopResponse> {
1207        let mut client = MachineServiceClient::new(self.channel.clone());
1208
1209        let proto_request: ProtoServiceStopRequest = request.into();
1210        let response = client.service_stop(proto_request).await?;
1211        let inner = response.into_inner();
1212
1213        Ok(ServiceStopResponse::from(inner))
1214    }
1215
1216    /// Restart a service.
1217    pub async fn service_restart(
1218        &self,
1219        request: ServiceRestartRequest,
1220    ) -> Result<ServiceRestartResponse> {
1221        let mut client = MachineServiceClient::new(self.channel.clone());
1222
1223        let proto_request: ProtoServiceRestartRequest = request.into();
1224        let response = client.service_restart(proto_request).await?;
1225        let inner = response.into_inner();
1226
1227        Ok(ServiceRestartResponse::from(inner))
1228    }
1229
1230    /// Get service/container logs (server-streaming).
1231    pub async fn logs(&self, request: LogsRequest) -> Result<LogsResponse> {
1232        use tonic::codegen::tokio_stream::StreamExt;
1233
1234        let mut client = MachineServiceClient::new(self.channel.clone());
1235
1236        let proto_request: ProtoLogsRequest = request.into();
1237        let response = client.logs(proto_request).await?;
1238        let mut stream = response.into_inner();
1239
1240        let mut data = Vec::new();
1241        let mut node = None;
1242
1243        while let Some(chunk) = stream.next().await {
1244            let chunk = chunk?;
1245            if node.is_none() {
1246                if let Some(metadata) = &chunk.metadata {
1247                    node = Some(metadata.hostname.clone());
1248                }
1249            }
1250            data.extend(chunk.bytes);
1251        }
1252
1253        Ok(LogsResponse::new(data, node))
1254    }
1255
1256    // =========================================================================
1257    // System Information
1258    // =========================================================================
1259
1260    /// Get system load averages.
1261    pub async fn load_avg(&self) -> Result<LoadAvgResponse> {
1262        let mut client = MachineServiceClient::new(self.channel.clone());
1263
1264        let response = client.load_avg(()).await?;
1265        let inner = response.into_inner();
1266
1267        Ok(LoadAvgResponse::from(inner))
1268    }
1269
1270    /// Get memory information.
1271    pub async fn memory(&self) -> Result<MemoryResponse> {
1272        let mut client = MachineServiceClient::new(self.channel.clone());
1273
1274        let response = client.memory(()).await?;
1275        let inner = response.into_inner();
1276
1277        Ok(MemoryResponse::from(inner))
1278    }
1279
1280    /// Get CPU information.
1281    pub async fn cpu_info(&self) -> Result<CpuInfoResponse> {
1282        let mut client = MachineServiceClient::new(self.channel.clone());
1283
1284        let response = client.cpu_info(()).await?;
1285        let inner = response.into_inner();
1286
1287        Ok(CpuInfoResponse::from(inner))
1288    }
1289
1290    /// Get disk statistics.
1291    pub async fn disk_stats(&self) -> Result<DiskStatsResponse> {
1292        let mut client = MachineServiceClient::new(self.channel.clone());
1293
1294        let response = client.disk_stats(()).await?;
1295        let inner = response.into_inner();
1296
1297        Ok(DiskStatsResponse::from(inner))
1298    }
1299
1300    /// Get network device statistics.
1301    pub async fn network_device_stats(&self) -> Result<NetworkDeviceStatsResponse> {
1302        let mut client = MachineServiceClient::new(self.channel.clone());
1303
1304        let response = client.network_device_stats(()).await?;
1305        let inner = response.into_inner();
1306
1307        Ok(NetworkDeviceStatsResponse::from(inner))
1308    }
1309
1310    /// Get mount points.
1311    pub async fn mounts(&self) -> Result<MountsResponse> {
1312        let mut client = MachineServiceClient::new(self.channel.clone());
1313
1314        let response = client.mounts(()).await?;
1315        let inner = response.into_inner();
1316
1317        Ok(MountsResponse::from(inner))
1318    }
1319
1320    /// Get process list.
1321    pub async fn processes(&self) -> Result<ProcessesResponse> {
1322        let mut client = MachineServiceClient::new(self.channel.clone());
1323
1324        let response = client.processes(()).await?;
1325        let inner = response.into_inner();
1326
1327        Ok(ProcessesResponse::from(inner))
1328    }
1329
1330    // =========================================================================
1331    // File Operations
1332    // =========================================================================
1333
1334    /// List directory contents (server-streaming).
1335    pub async fn list(&self, request: ListRequest) -> Result<ListResponse> {
1336        use tonic::codegen::tokio_stream::StreamExt;
1337
1338        let mut client = MachineServiceClient::new(self.channel.clone());
1339
1340        let proto_request: ProtoListRequest = request.into();
1341        let response = client.list(proto_request).await?;
1342        let mut stream = response.into_inner();
1343
1344        let mut entries = Vec::new();
1345        while let Some(info) = stream.next().await {
1346            let info = info?;
1347            entries.push(FileInfo::from(info));
1348        }
1349
1350        Ok(ListResponse::new(entries))
1351    }
1352
1353    /// Read a file (server-streaming).
1354    pub async fn read(&self, request: ReadRequest) -> Result<ReadResponse> {
1355        use tonic::codegen::tokio_stream::StreamExt;
1356
1357        let mut client = MachineServiceClient::new(self.channel.clone());
1358
1359        let proto_request: ProtoReadRequest = request.into();
1360        let response = client.read(proto_request).await?;
1361        let mut stream = response.into_inner();
1362
1363        let mut data = Vec::new();
1364        let mut node = None;
1365
1366        while let Some(chunk) = stream.next().await {
1367            let chunk = chunk?;
1368            if node.is_none() {
1369                if let Some(metadata) = &chunk.metadata {
1370                    node = Some(metadata.hostname.clone());
1371                }
1372            }
1373            data.extend(chunk.bytes);
1374        }
1375
1376        Ok(ReadResponse::new(data, node))
1377    }
1378
1379    /// Copy a file or directory as tar archive (server-streaming).
1380    pub async fn copy(&self, request: CopyRequest) -> Result<CopyResponse> {
1381        use tonic::codegen::tokio_stream::StreamExt;
1382
1383        let mut client = MachineServiceClient::new(self.channel.clone());
1384
1385        let proto_request: ProtoCopyRequest = request.into();
1386        let response = client.copy(proto_request).await?;
1387        let mut stream = response.into_inner();
1388
1389        let mut data = Vec::new();
1390        let mut node = None;
1391
1392        while let Some(chunk) = stream.next().await {
1393            let chunk = chunk?;
1394            if node.is_none() {
1395                if let Some(metadata) = &chunk.metadata {
1396                    node = Some(metadata.hostname.clone());
1397                }
1398            }
1399            data.extend(chunk.bytes);
1400        }
1401
1402        Ok(CopyResponse::new(data, node))
1403    }
1404
1405    /// Get disk usage (server-streaming).
1406    pub async fn disk_usage(&self, request: DiskUsageRequest) -> Result<DiskUsageResponse> {
1407        use tonic::codegen::tokio_stream::StreamExt;
1408
1409        let mut client = MachineServiceClient::new(self.channel.clone());
1410
1411        let proto_request: ProtoDiskUsageRequest = request.into();
1412        let response = client.disk_usage(proto_request).await?;
1413        let mut stream = response.into_inner();
1414
1415        let mut entries = Vec::new();
1416        while let Some(info) = stream.next().await {
1417            let info = info?;
1418            entries.push(DiskUsageInfo::from(info));
1419        }
1420
1421        Ok(DiskUsageResponse::new(entries))
1422    }
1423
1424    // =========================================================================
1425    // Advanced APIs
1426    // =========================================================================
1427
1428    /// Rollback a Talos node to the previous installed version.
1429    pub async fn rollback(&self) -> Result<RollbackResponse> {
1430        let mut client = MachineServiceClient::new(self.channel.clone());
1431
1432        let response = client.rollback(ProtoRollbackRequest {}).await?;
1433        let inner = response.into_inner();
1434
1435        Ok(RollbackResponse::from(inner))
1436    }
1437
1438    /// Generate client configuration (talosconfig).
1439    pub async fn generate_client_configuration(
1440        &self,
1441        request: GenerateClientConfigurationRequest,
1442    ) -> Result<GenerateClientConfigurationResponse> {
1443        let mut client = MachineServiceClient::new(self.channel.clone());
1444
1445        let proto_request: ProtoGenerateClientConfigRequest = request.into();
1446        let response = client.generate_client_configuration(proto_request).await?;
1447        let inner = response.into_inner();
1448
1449        Ok(GenerateClientConfigurationResponse::from(inner))
1450    }
1451
1452    /// Capture packets on a network interface (server-streaming).
1453    pub async fn packet_capture(
1454        &self,
1455        request: PacketCaptureRequest,
1456    ) -> Result<PacketCaptureResponse> {
1457        use tonic::codegen::tokio_stream::StreamExt;
1458
1459        let mut client = MachineServiceClient::new(self.channel.clone());
1460
1461        let proto_request: ProtoPacketCaptureRequest = request.into();
1462        let response = client.packet_capture(proto_request).await?;
1463        let mut stream = response.into_inner();
1464
1465        let mut data = Vec::new();
1466        let mut node = None;
1467
1468        while let Some(chunk) = stream.next().await {
1469            let chunk = chunk?;
1470            if node.is_none() {
1471                if let Some(metadata) = &chunk.metadata {
1472                    node = Some(metadata.hostname.clone());
1473                }
1474            }
1475            data.extend(chunk.bytes);
1476        }
1477
1478        Ok(PacketCaptureResponse::new(data, node))
1479    }
1480
1481    /// Get network connection information (netstat).
1482    pub async fn netstat(&self, request: NetstatRequest) -> Result<NetstatResponse> {
1483        let mut client = MachineServiceClient::new(self.channel.clone());
1484
1485        let proto_request: ProtoNetstatRequest = request.into();
1486        let response = client.netstat(proto_request).await?;
1487        let inner = response.into_inner();
1488
1489        Ok(NetstatResponse::from(inner))
1490    }
1491
1492    // ========================= Container Image APIs =========================
1493
1494    /// List container images in the specified containerd namespace.
1495    ///
1496    /// Returns a list of images from the node's containerd registry.
1497    ///
1498    /// # Arguments
1499    ///
1500    /// * `request` - The image list request specifying the namespace to query.
1501    ///
1502    /// # Example
1503    ///
1504    /// ```no_run
1505    /// # use talos_api_rs::client::{TalosClient, TalosClientConfig};
1506    /// # use talos_api_rs::resources::ImageListRequest;
1507    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1508    /// let config = TalosClientConfig::new("https://192.168.1.100:50000");
1509    /// let client = TalosClient::new(config).await?;
1510    ///
1511    /// // List all system images
1512    /// let images = client.image_list(ImageListRequest::system()).await?;
1513    /// for image in images {
1514    ///     println!("{}: {}", image.name, image.size_human());
1515    /// }
1516    ///
1517    /// // List Kubernetes images
1518    /// let cri_images = client.image_list(ImageListRequest::cri()).await?;
1519    /// # Ok(())
1520    /// # }
1521    /// ```
1522    pub async fn image_list(&self, request: ImageListRequest) -> Result<Vec<ImageInfo>> {
1523        use tonic::codegen::tokio_stream::StreamExt;
1524
1525        let mut client = MachineServiceClient::new(self.channel.clone());
1526        let proto_request: ProtoImageListRequest = request.into();
1527        let response = client.image_list(proto_request).await?;
1528
1529        let mut stream = response.into_inner();
1530        let mut images = Vec::new();
1531
1532        // Each item in the stream is a single image
1533        while let Some(item) = stream.next().await {
1534            let item = item?;
1535            // Check for errors in metadata
1536            if let Some(ref metadata) = item.metadata {
1537                if !metadata.error.is_empty() {
1538                    return Err(crate::error::TalosError::Validation(metadata.error.clone()));
1539                }
1540            }
1541            images.push(ImageInfo::from(item));
1542        }
1543
1544        Ok(images)
1545    }
1546
1547    /// Pull a container image into the node's containerd registry.
1548    ///
1549    /// Downloads and stores the specified image so it's available locally.
1550    ///
1551    /// # Arguments
1552    ///
1553    /// * `request` - The image pull request specifying the image reference and namespace.
1554    ///
1555    /// # Example
1556    ///
1557    /// ```no_run
1558    /// # use talos_api_rs::client::{TalosClient, TalosClientConfig};
1559    /// # use talos_api_rs::ImagePullRequest;
1560    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1561    /// let config = TalosClientConfig::new("https://192.168.1.100:50000");
1562    /// let client = TalosClient::new(config).await?;
1563    ///
1564    /// // Pull a specific image
1565    /// let response = client.image_pull(
1566    ///     ImagePullRequest::new("docker.io/library/nginx:latest")
1567    /// ).await?;
1568    /// println!("Pulled images: {:?}", response.results);
1569    /// # Ok(())
1570    /// # }
1571    /// ```
1572    pub async fn image_pull(&self, request: ImagePullRequest) -> Result<ImagePullResponse> {
1573        let mut client = MachineServiceClient::new(self.channel.clone());
1574        let proto_request: ProtoImagePullRequest = request.into();
1575        let response = client.image_pull(proto_request).await?;
1576        let inner = response.into_inner();
1577
1578        Ok(ImagePullResponse::from(inner))
1579    }
1580}
1581
1582// Helper for insecure mode
1583#[derive(Debug)]
1584struct NoVerifier;
1585
1586impl rustls::client::danger::ServerCertVerifier for NoVerifier {
1587    fn verify_server_cert(
1588        &self,
1589        _end_entity: &rustls::pki_types::CertificateDer<'_>,
1590        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
1591        _server_name: &rustls::pki_types::ServerName<'_>,
1592        _ocsp_response: &[u8],
1593        _now: rustls::pki_types::UnixTime,
1594    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1595        Ok(rustls::client::danger::ServerCertVerified::assertion())
1596    }
1597
1598    fn verify_tls12_signature(
1599        &self,
1600        _message: &[u8],
1601        _cert: &rustls::pki_types::CertificateDer<'_>,
1602        _dss: &rustls::DigitallySignedStruct,
1603    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1604        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1605    }
1606
1607    fn verify_tls13_signature(
1608        &self,
1609        _message: &[u8],
1610        _cert: &rustls::pki_types::CertificateDer<'_>,
1611        _dss: &rustls::DigitallySignedStruct,
1612    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1613        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1614    }
1615
1616    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1617        vec![
1618            rustls::SignatureScheme::RSA_PKCS1_SHA1,
1619            rustls::SignatureScheme::ECDSA_SHA1_Legacy,
1620            rustls::SignatureScheme::RSA_PKCS1_SHA256,
1621            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1622            rustls::SignatureScheme::RSA_PKCS1_SHA384,
1623            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
1624            rustls::SignatureScheme::RSA_PKCS1_SHA512,
1625            rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
1626            rustls::SignatureScheme::RSA_PSS_SHA256,
1627            rustls::SignatureScheme::RSA_PSS_SHA384,
1628            rustls::SignatureScheme::RSA_PSS_SHA512,
1629            rustls::SignatureScheme::ED25519,
1630            rustls::SignatureScheme::ED448,
1631        ]
1632    }
1633}
1634
1635pub use pool::{ConnectionPool, ConnectionPoolConfig, EndpointHealth, HealthStatus, LoadBalancer};