Skip to main content

talos_api_rs/client/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use crate::api::machine::machine_service_client::MachineServiceClient;
4use crate::api::machine::ApplyConfigurationRequest as ProtoApplyConfigRequest;
5use crate::api::machine::BootstrapRequest as ProtoBootstrapRequest;
6use crate::api::machine::CopyRequest as ProtoCopyRequest;
7use crate::api::machine::DiskUsageRequest as ProtoDiskUsageRequest;
8use crate::api::machine::DmesgRequest as ProtoDmesgRequest;
9use crate::api::machine::EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest;
10use crate::api::machine::EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest;
11use crate::api::machine::EtcdMemberListRequest as ProtoEtcdMemberListRequest;
12use crate::api::machine::EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest;
13use crate::api::machine::GenerateClientConfigurationRequest as ProtoGenerateClientConfigRequest;
14use crate::api::machine::ListRequest as ProtoListRequest;
15use crate::api::machine::LogsRequest as ProtoLogsRequest;
16use crate::api::machine::NetstatRequest as ProtoNetstatRequest;
17use crate::api::machine::PacketCaptureRequest as ProtoPacketCaptureRequest;
18use crate::api::machine::ReadRequest as ProtoReadRequest;
19use crate::api::machine::ResetRequest as ProtoResetRequest;
20use crate::api::machine::RollbackRequest as ProtoRollbackRequest;
21use crate::api::machine::ServiceRestartRequest as ProtoServiceRestartRequest;
22use crate::api::machine::ServiceStartRequest as ProtoServiceStartRequest;
23use crate::api::machine::ServiceStopRequest as ProtoServiceStopRequest;
24use crate::api::machine::UpgradeRequest as ProtoUpgradeRequest;
25use crate::api::version::version_service_client::VersionServiceClient;
26use crate::error::Result;
27use crate::resources::{
28    ApplyConfigurationRequest, ApplyConfigurationResponse, BootstrapRequest, BootstrapResponse,
29    CopyRequest, CopyResponse, CpuInfoResponse, DiskStatsResponse, DiskUsageInfo, DiskUsageRequest,
30    DiskUsageResponse, DmesgRequest, DmesgResponse, EtcdAlarmDisarmResponse, EtcdAlarmListResponse,
31    EtcdDefragmentResponse, EtcdForfeitLeadershipRequest, EtcdForfeitLeadershipResponse,
32    EtcdLeaveClusterRequest, EtcdLeaveClusterResponse, EtcdMemberListRequest,
33    EtcdMemberListResponse, EtcdRemoveMemberByIdRequest, EtcdRemoveMemberByIdResponse,
34    EtcdStatusResponse, FileInfo, GenerateClientConfigurationRequest,
35    GenerateClientConfigurationResponse, KubeconfigResponse, ListRequest, ListResponse,
36    LoadAvgResponse, LogsRequest, LogsResponse, MemoryResponse, MountsResponse, NetstatRequest,
37    NetstatResponse, NetworkDeviceStatsResponse, PacketCaptureRequest, PacketCaptureResponse,
38    ProcessesResponse, ReadRequest, ReadResponse, ResetRequest, ResetResponse, RollbackResponse,
39    ServiceRestartRequest, ServiceRestartResponse, ServiceStartRequest, ServiceStartResponse,
40    ServiceStopRequest, ServiceStopResponse, UpgradeRequest, UpgradeResponse,
41};
42use hyper_util::rt::TokioIo;
43use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
44use std::sync::Arc;
45use std::time::Duration;
46use tonic::transport::{Channel, Endpoint};
47
48/// Configuration for the Talos API client.
49#[derive(Clone, Debug)]
50pub struct TalosClientConfig {
51    /// The gRPC endpoint URL.
52    pub endpoint: String,
53    /// Path to client certificate.
54    pub crt_path: Option<String>,
55    /// Path to client private key.
56    pub key_path: Option<String>,
57    /// Path to CA certificate.
58    pub ca_path: Option<String>,
59    /// If true, skips TLS verification (insecure).
60    pub insecure: bool,
61    /// Connection timeout for establishing the gRPC channel.
62    pub connect_timeout: Option<Duration>,
63    /// Request timeout for individual RPC calls.
64    pub request_timeout: Option<Duration>,
65    /// Keepalive interval for long-running connections.
66    pub keepalive_interval: Option<Duration>,
67    /// Keepalive timeout.
68    pub keepalive_timeout: Option<Duration>,
69}
70
71impl Default for TalosClientConfig {
72    fn default() -> Self {
73        Self {
74            endpoint: "https://127.0.0.1:50000".to_string(),
75            crt_path: None,
76            key_path: None,
77            ca_path: None,
78            insecure: false,
79            connect_timeout: Some(Duration::from_secs(10)),
80            request_timeout: Some(Duration::from_secs(30)),
81            keepalive_interval: Some(Duration::from_secs(30)),
82            keepalive_timeout: Some(Duration::from_secs(10)),
83        }
84    }
85}
86
87impl TalosClientConfig {
88    /// Create a new configuration with an endpoint.
89    #[must_use]
90    pub fn new(endpoint: impl Into<String>) -> Self {
91        Self {
92            endpoint: endpoint.into(),
93            ..Default::default()
94        }
95    }
96
97    /// Create a builder for more complex configuration.
98    #[must_use]
99    pub fn builder(endpoint: impl Into<String>) -> TalosClientConfigBuilder {
100        TalosClientConfigBuilder::new(endpoint)
101    }
102
103    /// Set client certificate path.
104    #[must_use]
105    pub fn with_client_cert(mut self, crt_path: impl Into<String>) -> Self {
106        self.crt_path = Some(crt_path.into());
107        self
108    }
109
110    /// Set client key path.
111    #[must_use]
112    pub fn with_client_key(mut self, key_path: impl Into<String>) -> Self {
113        self.key_path = Some(key_path.into());
114        self
115    }
116
117    /// Set CA certificate path.
118    #[must_use]
119    pub fn with_ca(mut self, ca_path: impl Into<String>) -> Self {
120        self.ca_path = Some(ca_path.into());
121        self
122    }
123
124    /// Enable insecure mode (skip TLS verification).
125    #[must_use]
126    pub fn insecure(mut self) -> Self {
127        self.insecure = true;
128        self
129    }
130
131    /// Set connect timeout.
132    #[must_use]
133    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
134        self.connect_timeout = Some(timeout);
135        self
136    }
137
138    /// Set request timeout.
139    #[must_use]
140    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
141        self.request_timeout = Some(timeout);
142        self
143    }
144
145    /// Disable all timeouts.
146    #[must_use]
147    pub fn no_timeout(mut self) -> Self {
148        self.connect_timeout = None;
149        self.request_timeout = None;
150        self
151    }
152}
153
154/// Builder for `TalosClientConfig`.
155#[derive(Debug, Clone)]
156pub struct TalosClientConfigBuilder {
157    endpoint: String,
158    crt_path: Option<String>,
159    key_path: Option<String>,
160    ca_path: Option<String>,
161    insecure: bool,
162    connect_timeout: Option<Duration>,
163    request_timeout: Option<Duration>,
164    keepalive_interval: Option<Duration>,
165    keepalive_timeout: Option<Duration>,
166}
167
168impl TalosClientConfigBuilder {
169    /// Create a new builder.
170    #[must_use]
171    pub fn new(endpoint: impl Into<String>) -> Self {
172        Self {
173            endpoint: endpoint.into(),
174            crt_path: None,
175            key_path: None,
176            ca_path: None,
177            insecure: false,
178            connect_timeout: Some(Duration::from_secs(10)),
179            request_timeout: Some(Duration::from_secs(30)),
180            keepalive_interval: Some(Duration::from_secs(30)),
181            keepalive_timeout: Some(Duration::from_secs(10)),
182        }
183    }
184
185    /// Set client certificate path.
186    #[must_use]
187    pub fn client_cert(mut self, path: impl Into<String>) -> Self {
188        self.crt_path = Some(path.into());
189        self
190    }
191
192    /// Set client key path.
193    #[must_use]
194    pub fn client_key(mut self, path: impl Into<String>) -> Self {
195        self.key_path = Some(path.into());
196        self
197    }
198
199    /// Set CA certificate path.
200    #[must_use]
201    pub fn ca_cert(mut self, path: impl Into<String>) -> Self {
202        self.ca_path = Some(path.into());
203        self
204    }
205
206    /// Enable insecure mode.
207    #[must_use]
208    pub fn insecure(mut self) -> Self {
209        self.insecure = true;
210        self
211    }
212
213    /// Set connect timeout.
214    #[must_use]
215    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
216        self.connect_timeout = Some(timeout);
217        self
218    }
219
220    /// Set request timeout.
221    #[must_use]
222    pub fn request_timeout(mut self, timeout: Duration) -> Self {
223        self.request_timeout = Some(timeout);
224        self
225    }
226
227    /// Set keepalive settings.
228    #[must_use]
229    pub fn keepalive(mut self, interval: Duration, timeout: Duration) -> Self {
230        self.keepalive_interval = Some(interval);
231        self.keepalive_timeout = Some(timeout);
232        self
233    }
234
235    /// Disable timeouts.
236    #[must_use]
237    pub fn no_timeout(mut self) -> Self {
238        self.connect_timeout = None;
239        self.request_timeout = None;
240        self
241    }
242
243    /// Build the configuration.
244    #[must_use]
245    pub fn build(self) -> TalosClientConfig {
246        TalosClientConfig {
247            endpoint: self.endpoint,
248            crt_path: self.crt_path,
249            key_path: self.key_path,
250            ca_path: self.ca_path,
251            insecure: self.insecure,
252            connect_timeout: self.connect_timeout,
253            request_timeout: self.request_timeout,
254            keepalive_interval: self.keepalive_interval,
255            keepalive_timeout: self.keepalive_timeout,
256        }
257    }
258}
259
260#[derive(Clone)]
261pub struct TalosClient {
262    #[allow(dead_code)] // TODO: Remove when config is used
263    config: TalosClientConfig,
264    channel: Channel,
265}
266
267impl TalosClient {
268    pub async fn new(config: TalosClientConfig) -> Result<Self> {
269        // Install ring as default crypto provider (supports ED25519)
270        let _ = rustls::crypto::ring::default_provider().install_default();
271
272        // Check if using plain HTTP (no TLS)
273        let is_http = config.endpoint.starts_with("http://");
274
275        let channel = if is_http {
276            // Plain HTTP - no TLS at all
277            Self::create_http_channel(&config).await?
278        } else if config.insecure {
279            Self::create_insecure_channel(&config).await?
280        } else {
281            Self::create_mtls_channel(&config).await?
282        };
283
284        Ok(Self { config, channel })
285    }
286
287    /// Create a plain HTTP channel (no TLS)
288    async fn create_http_channel(config: &TalosClientConfig) -> Result<Channel> {
289        let mut endpoint = Channel::from_shared(config.endpoint.clone())
290            .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
291
292        // Apply timeout configuration
293        if let Some(timeout) = config.connect_timeout {
294            endpoint = endpoint.connect_timeout(timeout);
295        }
296        if let Some(timeout) = config.request_timeout {
297            endpoint = endpoint.timeout(timeout);
298        }
299        if let Some(interval) = config.keepalive_interval {
300            if let Some(ka_timeout) = config.keepalive_timeout {
301                endpoint = endpoint
302                    .http2_keep_alive_interval(interval)
303                    .keep_alive_timeout(ka_timeout);
304            }
305        }
306
307        let channel = endpoint.connect().await?;
308        Ok(channel)
309    }
310
311    /// Create an insecure channel (TLS without certificate verification)
312    async fn create_insecure_channel(config: &TalosClientConfig) -> Result<Channel> {
313        let tls_config = rustls::ClientConfig::builder()
314            .with_root_certificates(rustls::RootCertStore::empty())
315            .with_no_client_auth();
316
317        Self::connect_with_custom_tls(config, tls_config, true).await
318    }
319
320    /// Create an mTLS channel with full certificate verification
321    async fn create_mtls_channel(config: &TalosClientConfig) -> Result<Channel> {
322        // Load CA certificate
323        let root_store = if let Some(ca_path) = &config.ca_path {
324            let ca_pem = std::fs::read(ca_path).map_err(|e| {
325                crate::error::TalosError::Config(format!("Failed to read CA cert: {e}"))
326            })?;
327            let mut root_store = rustls::RootCertStore::empty();
328            let certs = Self::load_pem_certs(&ca_pem)?;
329            for cert in certs {
330                root_store.add(cert).map_err(|e| {
331                    crate::error::TalosError::Config(format!("Failed to add CA cert: {e}"))
332                })?;
333            }
334            root_store
335        } else {
336            // Use system roots if no CA provided
337            let mut root_store = rustls::RootCertStore::empty();
338            root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
339            root_store
340        };
341
342        // Build TLS config with or without client auth
343        let tls_config =
344            if let (Some(crt_path), Some(key_path)) = (&config.crt_path, &config.key_path) {
345                // mTLS with client certificate
346                let cert_pem = std::fs::read(crt_path).map_err(|e| {
347                    crate::error::TalosError::Config(format!("Failed to read client cert: {e}"))
348                })?;
349                let key_pem = std::fs::read(key_path).map_err(|e| {
350                    crate::error::TalosError::Config(format!("Failed to read client key: {e}"))
351                })?;
352
353                let client_certs = Self::load_pem_certs(&cert_pem)?;
354                let client_key = Self::load_pem_key(&key_pem)?;
355
356                rustls::ClientConfig::builder()
357                    .with_root_certificates(root_store)
358                    .with_client_auth_cert(client_certs, client_key)
359                    .map_err(|e| {
360                        crate::error::TalosError::Config(format!(
361                            "Failed to configure client auth: {e}"
362                        ))
363                    })?
364            } else {
365                // TLS without client auth
366                rustls::ClientConfig::builder()
367                    .with_root_certificates(root_store)
368                    .with_no_client_auth()
369            };
370
371        Self::connect_with_custom_tls(config, tls_config, false).await
372    }
373
374    /// Connect using a custom rustls TLS configuration
375    async fn connect_with_custom_tls(
376        config: &TalosClientConfig,
377        mut tls_config: rustls::ClientConfig,
378        skip_verification: bool,
379    ) -> Result<Channel> {
380        // Override verifier for insecure mode
381        if skip_verification {
382            tls_config
383                .dangerous()
384                .set_certificate_verifier(Arc::new(NoVerifier));
385        }
386
387        // gRPC requires ALPN h2
388        tls_config.alpn_protocols = vec![b"h2".to_vec()];
389        let tls_config = Arc::new(tls_config);
390        let connector = tokio_rustls::TlsConnector::from(tls_config);
391
392        // Extract host for SNI
393        let endpoint_url = if config.endpoint.starts_with("http") {
394            config.endpoint.clone()
395        } else {
396            format!("https://{}", config.endpoint)
397        };
398        let parsed_url = url::Url::parse(&endpoint_url)
399            .map_err(|e| crate::error::TalosError::Config(format!("Invalid endpoint URL: {e}")))?;
400        let host = parsed_url
401            .host_str()
402            .ok_or_else(|| crate::error::TalosError::Config("No host in endpoint".to_string()))?
403            .to_string();
404        let port = parsed_url.port().unwrap_or(50000);
405
406        // For custom connector, use http:// scheme (we handle TLS ourselves)
407        let endpoint_for_connector = format!("http://{}:{}", host, port);
408
409        // Build endpoint with timeout configuration
410        let mut endpoint = Endpoint::from_shared(endpoint_for_connector)
411            .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
412
413        // Apply timeout configuration
414        if let Some(timeout) = config.connect_timeout {
415            endpoint = endpoint.connect_timeout(timeout);
416        }
417        if let Some(timeout) = config.request_timeout {
418            endpoint = endpoint.timeout(timeout);
419        }
420        if let Some(interval) = config.keepalive_interval {
421            if let Some(ka_timeout) = config.keepalive_timeout {
422                endpoint = endpoint
423                    .http2_keep_alive_interval(interval)
424                    .keep_alive_timeout(ka_timeout);
425            }
426        }
427
428        let channel = endpoint
429            .connect_with_connector(tower::service_fn(move |uri: tonic::transport::Uri| {
430                let connector = connector.clone();
431                let host = host.clone();
432                async move {
433                    let uri_host = uri.host().unwrap_or("127.0.0.1");
434                    let uri_port = uri.port_u16().unwrap_or(50000);
435                    let addr = format!("{}:{}", uri_host, uri_port);
436
437                    let tcp = tokio::net::TcpStream::connect(addr).await?;
438
439                    // Use actual hostname for SNI (important for cert verification)
440                    let server_name = ServerName::try_from(host.clone())
441                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
442
443                    let tls_stream = connector.connect(server_name, tcp).await?;
444                    Ok::<_, std::io::Error>(TokioIo::new(tls_stream))
445                }
446            }))
447            .await?;
448
449        Ok(channel)
450    }
451
452    /// Load PEM-encoded certificates
453    #[allow(clippy::result_large_err)]
454    fn load_pem_certs(pem_data: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
455        let mut reader = std::io::BufReader::new(pem_data);
456        let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
457            .collect::<std::result::Result<Vec<_>, _>>()
458            .map_err(|e| {
459                crate::error::TalosError::Config(format!("Failed to parse PEM certificates: {e}"))
460            })?;
461        if certs.is_empty() {
462            return Err(crate::error::TalosError::Config(
463                "No certificates found in PEM data".to_string(),
464            ));
465        }
466        Ok(certs)
467    }
468
469    /// Load PEM-encoded private key (supports RSA, EC, PKCS8, and ED25519)
470    #[allow(clippy::result_large_err)]
471    fn load_pem_key(pem_data: &[u8]) -> Result<PrivateKeyDer<'static>> {
472        // First, try standard PEM formats via rustls_pemfile
473        let mut reader = std::io::BufReader::new(pem_data);
474
475        loop {
476            match rustls_pemfile::read_one(&mut reader) {
477                Ok(Some(rustls_pemfile::Item::Pkcs1Key(key))) => {
478                    return Ok(PrivateKeyDer::Pkcs1(key));
479                }
480                Ok(Some(rustls_pemfile::Item::Pkcs8Key(key))) => {
481                    return Ok(PrivateKeyDer::Pkcs8(key));
482                }
483                Ok(Some(rustls_pemfile::Item::Sec1Key(key))) => {
484                    return Ok(PrivateKeyDer::Sec1(key));
485                }
486                Ok(Some(_)) => {
487                    // Skip other PEM items (certificates, etc.)
488                    continue;
489                }
490                Ok(None) => {
491                    break;
492                }
493                Err(e) => {
494                    return Err(crate::error::TalosError::Config(format!(
495                        "Failed to parse PEM key: {e}"
496                    )));
497                }
498            }
499        }
500
501        // Fallback: Handle non-standard "ED25519 PRIVATE KEY" PEM label
502        // Talos uses this format, which is PKCS#8-encoded but with a custom label
503        let pem_str = std::str::from_utf8(pem_data)
504            .map_err(|e| crate::error::TalosError::Config(format!("Invalid UTF-8 in key: {e}")))?;
505
506        if pem_str.contains("-----BEGIN ED25519 PRIVATE KEY-----") {
507            // Extract the base64 content between the headers
508            let start_marker = "-----BEGIN ED25519 PRIVATE KEY-----";
509            let end_marker = "-----END ED25519 PRIVATE KEY-----";
510
511            if let Some(start) = pem_str.find(start_marker) {
512                if let Some(end) = pem_str.find(end_marker) {
513                    let base64_content = &pem_str[start + start_marker.len()..end];
514                    let base64_clean: String = base64_content
515                        .chars()
516                        .filter(|c| !c.is_whitespace())
517                        .collect();
518
519                    let der_bytes = base64::Engine::decode(
520                        &base64::engine::general_purpose::STANDARD,
521                        &base64_clean,
522                    )
523                    .map_err(|e| {
524                        crate::error::TalosError::Config(format!(
525                            "Failed to decode ED25519 key: {e}"
526                        ))
527                    })?;
528
529                    // ED25519 PRIVATE KEY is actually PKCS#8 encoded
530                    return Ok(PrivateKeyDer::Pkcs8(
531                        rustls::pki_types::PrivatePkcs8KeyDer::from(der_bytes),
532                    ));
533                }
534            }
535        }
536
537        Err(crate::error::TalosError::Config(
538            "No private key found in PEM data".to_string(),
539        ))
540    }
541
542    /// Access the Version API group
543    pub fn version(&self) -> VersionServiceClient<Channel> {
544        VersionServiceClient::new(self.channel.clone())
545    }
546
547    /// Access the Machine API group
548    pub fn machine(&self) -> MachineServiceClient<Channel> {
549        MachineServiceClient::new(self.channel.clone())
550    }
551
552    // ========================================================================
553    // High-level convenience methods
554    // ========================================================================
555
556    /// Apply a configuration to the node.
557    ///
558    /// This is a high-level wrapper around the `MachineService::ApplyConfiguration` RPC.
559    ///
560    /// # Example
561    ///
562    /// ```ignore
563    /// use talos_api_rs::{TalosClient, TalosClientConfig, ApplyConfigurationRequest, ApplyMode};
564    ///
565    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
566    /// let client = TalosClient::new(TalosClientConfig {
567    ///     endpoint: "https://192.168.1.100:50000".to_string(),
568    ///     insecure: true,
569    ///     ..Default::default()
570    /// }).await?;
571    ///
572    /// // Apply configuration in dry-run mode
573    /// let request = ApplyConfigurationRequest::builder()
574    ///     .config_yaml("machine:\n  type: worker")
575    ///     .mode(ApplyMode::NoReboot)
576    ///     .dry_run(true)
577    ///     .build();
578    ///
579    /// let response = client.apply_configuration(request).await?;
580    /// println!("Warnings: {:?}", response.all_warnings());
581    /// # Ok(())
582    /// # }
583    /// ```
584    ///
585    /// # Errors
586    ///
587    /// Returns an error if the RPC call fails or the configuration is invalid.
588    pub async fn apply_configuration(
589        &self,
590        request: ApplyConfigurationRequest,
591    ) -> Result<ApplyConfigurationResponse> {
592        let proto_request: ProtoApplyConfigRequest = request.into();
593        let response = self
594            .machine()
595            .apply_configuration(proto_request)
596            .await?
597            .into_inner();
598        Ok(response.into())
599    }
600
601    /// Apply a YAML configuration string to the node.
602    ///
603    /// Convenience method for simple configuration application.
604    ///
605    /// # Example
606    ///
607    /// ```ignore
608    /// use talos_api_rs::{TalosClient, TalosClientConfig, ApplyMode};
609    ///
610    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
611    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
612    /// let config_yaml = std::fs::read_to_string("machine.yaml")?;
613    /// let response = client.apply_configuration_yaml(&config_yaml, ApplyMode::Auto, false).await?;
614    /// # Ok(())
615    /// # }
616    /// ```
617    pub async fn apply_configuration_yaml(
618        &self,
619        yaml: &str,
620        mode: crate::ApplyMode,
621        dry_run: bool,
622    ) -> Result<ApplyConfigurationResponse> {
623        let request = ApplyConfigurationRequest::builder()
624            .config_yaml(yaml)
625            .mode(mode)
626            .dry_run(dry_run)
627            .build();
628        self.apply_configuration(request).await
629    }
630
631    /// Bootstrap the etcd cluster on this node.
632    ///
633    /// This initializes a new etcd cluster. **This should only be called ONCE**
634    /// on the first control-plane node when creating a new Talos cluster.
635    ///
636    /// # Example
637    ///
638    /// ```ignore
639    /// use talos_api_rs::{TalosClient, TalosClientConfig, BootstrapRequest};
640    ///
641    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
642    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
643    ///
644    /// // Bootstrap a new cluster
645    /// let response = client.bootstrap(BootstrapRequest::new()).await?;
646    /// println!("Bootstrap complete: {:?}", response.first());
647    /// # Ok(())
648    /// # }
649    /// ```
650    ///
651    /// # Recovery
652    ///
653    /// To recover from an etcd snapshot (uploaded via `EtcdRecover` RPC):
654    ///
655    /// ```ignore
656    /// use talos_api_rs::{TalosClient, TalosClientConfig, BootstrapRequest};
657    ///
658    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
659    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
660    /// let response = client.bootstrap(BootstrapRequest::recovery()).await?;
661    /// # Ok(())
662    /// # }
663    /// ```
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if:
668    /// - The node is not a control-plane node
669    /// - etcd is already bootstrapped
670    /// - Network/connection issues
671    pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<BootstrapResponse> {
672        let proto_request: ProtoBootstrapRequest = request.into();
673        let response = self.machine().bootstrap(proto_request).await?.into_inner();
674        Ok(response.into())
675    }
676
677    /// Bootstrap a new etcd cluster (convenience method).
678    ///
679    /// Equivalent to `bootstrap(BootstrapRequest::new())`.
680    pub async fn bootstrap_cluster(&self) -> Result<BootstrapResponse> {
681        self.bootstrap(BootstrapRequest::new()).await
682    }
683
684    /// Retrieve the kubeconfig from the cluster.
685    ///
686    /// This is a server-streaming RPC that retrieves the kubeconfig file
687    /// from a control-plane node. The kubeconfig can be used to access
688    /// the Kubernetes API of the cluster.
689    ///
690    /// # Example
691    ///
692    /// ```ignore
693    /// use talos_api_rs::{TalosClient, TalosClientConfig};
694    ///
695    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
696    /// let client = TalosClient::new(TalosClientConfig::default()).await?;
697    ///
698    /// // Get kubeconfig
699    /// let kubeconfig = client.kubeconfig().await?;
700    /// println!("Kubeconfig from node: {:?}", kubeconfig.node);
701    ///
702    /// // Write to file
703    /// kubeconfig.write_to_file("kubeconfig.yaml")?;
704    ///
705    /// // Or get as string
706    /// let yaml = kubeconfig.as_str()?;
707    /// println!("{}", yaml);
708    /// # Ok(())
709    /// # }
710    /// ```
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if:
715    /// - The node is not a control-plane node
716    /// - The cluster is not yet bootstrapped
717    /// - Network/connection issues
718    pub async fn kubeconfig(&self) -> Result<KubeconfigResponse> {
719        use tonic::codegen::tokio_stream::StreamExt;
720
721        let mut stream = self.machine().kubeconfig(()).await?.into_inner();
722
723        let mut data = Vec::new();
724        let mut node = None;
725
726        while let Some(chunk) = stream.next().await {
727            let chunk = chunk?;
728            // Capture node from first chunk with metadata
729            if node.is_none() {
730                if let Some(metadata) = &chunk.metadata {
731                    node = Some(metadata.hostname.clone());
732                }
733            }
734            data.extend(chunk.bytes);
735        }
736
737        Ok(KubeconfigResponse::new(data, node))
738    }
739
740    /// Reset a Talos node, optionally wiping disks.
741    ///
742    /// # Warning
743    ///
744    /// This is a **destructive** operation. The node will be reset and may
745    /// lose all data depending on the wipe mode configured.
746    ///
747    /// # Arguments
748    ///
749    /// * `request` - The reset request configuration
750    ///
751    /// # Example
752    ///
753    /// ```ignore
754    /// use talos_api_rs::{TalosClient, TalosClientConfig, ResetRequest};
755    ///
756    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
757    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
758    /// let client = TalosClient::new(config).await?;
759    ///
760    /// // Graceful reset (leaves etcd cluster first)
761    /// let response = client.reset(ResetRequest::graceful()).await?;
762    ///
763    /// // Force reset with full disk wipe
764    /// let response = client.reset(ResetRequest::force()).await?;
765    ///
766    /// // Custom reset
767    /// let response = client.reset(
768    ///     ResetRequest::builder()
769    ///         .graceful(true)
770    ///         .reboot(true)
771    ///         .build()
772    /// ).await?;
773    /// # Ok(())
774    /// # }
775    /// ```
776    pub async fn reset(&self, request: ResetRequest) -> Result<ResetResponse> {
777        let mut client = MachineServiceClient::new(self.channel.clone());
778
779        let proto_request: ProtoResetRequest = request.into();
780        let response = client.reset(proto_request).await?;
781        let inner = response.into_inner();
782
783        Ok(ResetResponse::from(inner))
784    }
785
786    /// Gracefully reset a Talos node.
787    ///
788    /// This is a convenience method that performs a graceful reset, which:
789    /// - Leaves the etcd cluster gracefully (for control plane nodes)
790    /// - Reboots after reset
791    /// - Does not wipe disks
792    ///
793    /// For more control, use [`reset`](Self::reset) with a custom [`ResetRequest`].
794    pub async fn reset_graceful(&self) -> Result<ResetResponse> {
795        self.reset(ResetRequest::graceful()).await
796    }
797
798    // =========================================================================
799    // etcd Operations
800    // =========================================================================
801
802    /// List etcd cluster members.
803    ///
804    /// # Example
805    ///
806    /// ```ignore
807    /// use talos_api_rs::{TalosClient, TalosClientConfig, EtcdMemberListRequest};
808    ///
809    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
810    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
811    /// let client = TalosClient::new(config).await?;
812    ///
813    /// let response = client.etcd_member_list(EtcdMemberListRequest::new()).await?;
814    /// for member in response.all_members() {
815    ///     println!("{}: {}", member.id, member.hostname);
816    /// }
817    /// # Ok(())
818    /// # }
819    /// ```
820    pub async fn etcd_member_list(
821        &self,
822        request: EtcdMemberListRequest,
823    ) -> Result<EtcdMemberListResponse> {
824        let mut client = MachineServiceClient::new(self.channel.clone());
825
826        let proto_request: ProtoEtcdMemberListRequest = request.into();
827        let response = client.etcd_member_list(proto_request).await?;
828        let inner = response.into_inner();
829
830        Ok(EtcdMemberListResponse::from(inner))
831    }
832
833    /// Remove an etcd member by ID.
834    ///
835    /// Use this to remove members that no longer have an associated Talos node.
836    /// For nodes that are still running, use [`etcd_leave_cluster`](Self::etcd_leave_cluster).
837    ///
838    /// # Example
839    ///
840    /// ```ignore
841    /// use talos_api_rs::{TalosClient, TalosClientConfig, EtcdRemoveMemberByIdRequest};
842    ///
843    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
844    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
845    /// let client = TalosClient::new(config).await?;
846    ///
847    /// // First, find the member ID
848    /// let members = client.etcd_member_list(Default::default()).await?;
849    /// if let Some(member) = members.find_by_hostname("old-node") {
850    ///     client.etcd_remove_member_by_id(
851    ///         EtcdRemoveMemberByIdRequest::new(member.id)
852    ///     ).await?;
853    /// }
854    /// # Ok(())
855    /// # }
856    /// ```
857    pub async fn etcd_remove_member_by_id(
858        &self,
859        request: EtcdRemoveMemberByIdRequest,
860    ) -> Result<EtcdRemoveMemberByIdResponse> {
861        let mut client = MachineServiceClient::new(self.channel.clone());
862
863        let proto_request: ProtoEtcdRemoveMemberByIdRequest = request.into();
864        let response = client.etcd_remove_member_by_id(proto_request).await?;
865        let inner = response.into_inner();
866
867        Ok(EtcdRemoveMemberByIdResponse::from(inner))
868    }
869
870    /// Make a node leave the etcd cluster gracefully.
871    ///
872    /// This should be called on the node that is being removed.
873    pub async fn etcd_leave_cluster(
874        &self,
875        request: EtcdLeaveClusterRequest,
876    ) -> Result<EtcdLeaveClusterResponse> {
877        let mut client = MachineServiceClient::new(self.channel.clone());
878
879        let proto_request: ProtoEtcdLeaveClusterRequest = request.into();
880        let response = client.etcd_leave_cluster(proto_request).await?;
881        let inner = response.into_inner();
882
883        Ok(EtcdLeaveClusterResponse::from(inner))
884    }
885
886    /// Forfeit etcd leadership.
887    ///
888    /// Causes the current leader to step down and trigger a new election.
889    pub async fn etcd_forfeit_leadership(
890        &self,
891        request: EtcdForfeitLeadershipRequest,
892    ) -> Result<EtcdForfeitLeadershipResponse> {
893        let mut client = MachineServiceClient::new(self.channel.clone());
894
895        let proto_request: ProtoEtcdForfeitLeadershipRequest = request.into();
896        let response = client.etcd_forfeit_leadership(proto_request).await?;
897        let inner = response.into_inner();
898
899        Ok(EtcdForfeitLeadershipResponse::from(inner))
900    }
901
902    /// Get etcd status for the current member.
903    pub async fn etcd_status(&self) -> Result<EtcdStatusResponse> {
904        let mut client = MachineServiceClient::new(self.channel.clone());
905
906        let response = client.etcd_status(()).await?;
907        let inner = response.into_inner();
908
909        Ok(EtcdStatusResponse::from(inner))
910    }
911
912    /// List etcd alarms.
913    pub async fn etcd_alarm_list(&self) -> Result<EtcdAlarmListResponse> {
914        let mut client = MachineServiceClient::new(self.channel.clone());
915
916        let response = client.etcd_alarm_list(()).await?;
917        let inner = response.into_inner();
918
919        Ok(EtcdAlarmListResponse::from(inner))
920    }
921
922    /// Disarm etcd alarms.
923    pub async fn etcd_alarm_disarm(&self) -> Result<EtcdAlarmDisarmResponse> {
924        let mut client = MachineServiceClient::new(self.channel.clone());
925
926        let response = client.etcd_alarm_disarm(()).await?;
927        let inner = response.into_inner();
928
929        Ok(EtcdAlarmDisarmResponse::from(inner))
930    }
931
932    /// Defragment etcd storage.
933    ///
934    /// **Warning**: This is a resource-heavy operation.
935    pub async fn etcd_defragment(&self) -> Result<EtcdDefragmentResponse> {
936        let mut client = MachineServiceClient::new(self.channel.clone());
937
938        let response = client.etcd_defragment(()).await?;
939        let inner = response.into_inner();
940
941        Ok(EtcdDefragmentResponse::from(inner))
942    }
943
944    // =========================================================================
945    // Diagnostics
946    // =========================================================================
947
948    /// Get kernel message buffer (dmesg).
949    ///
950    /// This is a server-streaming RPC that returns kernel messages.
951    ///
952    /// # Example
953    ///
954    /// ```ignore
955    /// use talos_api_rs::{TalosClient, TalosClientConfig, DmesgRequest};
956    ///
957    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
958    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
959    /// let client = TalosClient::new(config).await?;
960    ///
961    /// let dmesg = client.dmesg(DmesgRequest::new()).await?;
962    /// println!("{}", dmesg.as_string_lossy());
963    /// # Ok(())
964    /// # }
965    /// ```
966    pub async fn dmesg(&self, request: DmesgRequest) -> Result<DmesgResponse> {
967        use tonic::codegen::tokio_stream::StreamExt;
968
969        let mut client = MachineServiceClient::new(self.channel.clone());
970
971        let proto_request: ProtoDmesgRequest = request.into();
972        let response = client.dmesg(proto_request).await?;
973        let mut stream = response.into_inner();
974
975        let mut data = Vec::new();
976        let mut node = None;
977
978        while let Some(chunk) = stream.next().await {
979            let chunk = chunk?;
980            if node.is_none() {
981                if let Some(metadata) = &chunk.metadata {
982                    node = Some(metadata.hostname.clone());
983                }
984            }
985            data.extend(chunk.bytes);
986        }
987
988        Ok(DmesgResponse::new(data, node))
989    }
990
991    // =========================================================================
992    // Upgrade
993    // =========================================================================
994
995    /// Upgrade a Talos node to a new version.
996    ///
997    /// # Example
998    ///
999    /// ```ignore
1000    /// use talos_api_rs::{TalosClient, TalosClientConfig, UpgradeRequest};
1001    ///
1002    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1003    /// let config = TalosClientConfig::new("https://192.168.1.100:50000".parse()?);
1004    /// let client = TalosClient::new(config).await?;
1005    ///
1006    /// // Upgrade to a specific version
1007    /// let response = client.upgrade(
1008    ///     UpgradeRequest::new("ghcr.io/siderolabs/installer:v1.6.0")
1009    /// ).await?;
1010    ///
1011    /// // Staged upgrade (downloads but doesn't apply until reboot)
1012    /// let response = client.upgrade(
1013    ///     UpgradeRequest::builder("ghcr.io/siderolabs/installer:v1.6.0")
1014    ///         .stage(true)
1015    ///         .preserve(true)
1016    ///         .build()
1017    /// ).await?;
1018    /// # Ok(())
1019    /// # }
1020    /// ```
1021    pub async fn upgrade(&self, request: UpgradeRequest) -> Result<UpgradeResponse> {
1022        let mut client = MachineServiceClient::new(self.channel.clone());
1023
1024        let proto_request: ProtoUpgradeRequest = request.into();
1025        let response = client.upgrade(proto_request).await?;
1026        let inner = response.into_inner();
1027
1028        Ok(UpgradeResponse::from(inner))
1029    }
1030
1031    // =========================================================================
1032    // Service Management
1033    // =========================================================================
1034
1035    /// Start a service.
1036    pub async fn service_start(
1037        &self,
1038        request: ServiceStartRequest,
1039    ) -> Result<ServiceStartResponse> {
1040        let mut client = MachineServiceClient::new(self.channel.clone());
1041
1042        let proto_request: ProtoServiceStartRequest = request.into();
1043        let response = client.service_start(proto_request).await?;
1044        let inner = response.into_inner();
1045
1046        Ok(ServiceStartResponse::from(inner))
1047    }
1048
1049    /// Stop a service.
1050    pub async fn service_stop(&self, request: ServiceStopRequest) -> Result<ServiceStopResponse> {
1051        let mut client = MachineServiceClient::new(self.channel.clone());
1052
1053        let proto_request: ProtoServiceStopRequest = request.into();
1054        let response = client.service_stop(proto_request).await?;
1055        let inner = response.into_inner();
1056
1057        Ok(ServiceStopResponse::from(inner))
1058    }
1059
1060    /// Restart a service.
1061    pub async fn service_restart(
1062        &self,
1063        request: ServiceRestartRequest,
1064    ) -> Result<ServiceRestartResponse> {
1065        let mut client = MachineServiceClient::new(self.channel.clone());
1066
1067        let proto_request: ProtoServiceRestartRequest = request.into();
1068        let response = client.service_restart(proto_request).await?;
1069        let inner = response.into_inner();
1070
1071        Ok(ServiceRestartResponse::from(inner))
1072    }
1073
1074    /// Get service/container logs (server-streaming).
1075    pub async fn logs(&self, request: LogsRequest) -> Result<LogsResponse> {
1076        use tonic::codegen::tokio_stream::StreamExt;
1077
1078        let mut client = MachineServiceClient::new(self.channel.clone());
1079
1080        let proto_request: ProtoLogsRequest = request.into();
1081        let response = client.logs(proto_request).await?;
1082        let mut stream = response.into_inner();
1083
1084        let mut data = Vec::new();
1085        let mut node = None;
1086
1087        while let Some(chunk) = stream.next().await {
1088            let chunk = chunk?;
1089            if node.is_none() {
1090                if let Some(metadata) = &chunk.metadata {
1091                    node = Some(metadata.hostname.clone());
1092                }
1093            }
1094            data.extend(chunk.bytes);
1095        }
1096
1097        Ok(LogsResponse::new(data, node))
1098    }
1099
1100    // =========================================================================
1101    // System Information
1102    // =========================================================================
1103
1104    /// Get system load averages.
1105    pub async fn load_avg(&self) -> Result<LoadAvgResponse> {
1106        let mut client = MachineServiceClient::new(self.channel.clone());
1107
1108        let response = client.load_avg(()).await?;
1109        let inner = response.into_inner();
1110
1111        Ok(LoadAvgResponse::from(inner))
1112    }
1113
1114    /// Get memory information.
1115    pub async fn memory(&self) -> Result<MemoryResponse> {
1116        let mut client = MachineServiceClient::new(self.channel.clone());
1117
1118        let response = client.memory(()).await?;
1119        let inner = response.into_inner();
1120
1121        Ok(MemoryResponse::from(inner))
1122    }
1123
1124    /// Get CPU information.
1125    pub async fn cpu_info(&self) -> Result<CpuInfoResponse> {
1126        let mut client = MachineServiceClient::new(self.channel.clone());
1127
1128        let response = client.cpu_info(()).await?;
1129        let inner = response.into_inner();
1130
1131        Ok(CpuInfoResponse::from(inner))
1132    }
1133
1134    /// Get disk statistics.
1135    pub async fn disk_stats(&self) -> Result<DiskStatsResponse> {
1136        let mut client = MachineServiceClient::new(self.channel.clone());
1137
1138        let response = client.disk_stats(()).await?;
1139        let inner = response.into_inner();
1140
1141        Ok(DiskStatsResponse::from(inner))
1142    }
1143
1144    /// Get network device statistics.
1145    pub async fn network_device_stats(&self) -> Result<NetworkDeviceStatsResponse> {
1146        let mut client = MachineServiceClient::new(self.channel.clone());
1147
1148        let response = client.network_device_stats(()).await?;
1149        let inner = response.into_inner();
1150
1151        Ok(NetworkDeviceStatsResponse::from(inner))
1152    }
1153
1154    /// Get mount points.
1155    pub async fn mounts(&self) -> Result<MountsResponse> {
1156        let mut client = MachineServiceClient::new(self.channel.clone());
1157
1158        let response = client.mounts(()).await?;
1159        let inner = response.into_inner();
1160
1161        Ok(MountsResponse::from(inner))
1162    }
1163
1164    /// Get process list.
1165    pub async fn processes(&self) -> Result<ProcessesResponse> {
1166        let mut client = MachineServiceClient::new(self.channel.clone());
1167
1168        let response = client.processes(()).await?;
1169        let inner = response.into_inner();
1170
1171        Ok(ProcessesResponse::from(inner))
1172    }
1173
1174    // =========================================================================
1175    // File Operations
1176    // =========================================================================
1177
1178    /// List directory contents (server-streaming).
1179    pub async fn list(&self, request: ListRequest) -> Result<ListResponse> {
1180        use tonic::codegen::tokio_stream::StreamExt;
1181
1182        let mut client = MachineServiceClient::new(self.channel.clone());
1183
1184        let proto_request: ProtoListRequest = request.into();
1185        let response = client.list(proto_request).await?;
1186        let mut stream = response.into_inner();
1187
1188        let mut entries = Vec::new();
1189        while let Some(info) = stream.next().await {
1190            let info = info?;
1191            entries.push(FileInfo::from(info));
1192        }
1193
1194        Ok(ListResponse::new(entries))
1195    }
1196
1197    /// Read a file (server-streaming).
1198    pub async fn read(&self, request: ReadRequest) -> Result<ReadResponse> {
1199        use tonic::codegen::tokio_stream::StreamExt;
1200
1201        let mut client = MachineServiceClient::new(self.channel.clone());
1202
1203        let proto_request: ProtoReadRequest = request.into();
1204        let response = client.read(proto_request).await?;
1205        let mut stream = response.into_inner();
1206
1207        let mut data = Vec::new();
1208        let mut node = None;
1209
1210        while let Some(chunk) = stream.next().await {
1211            let chunk = chunk?;
1212            if node.is_none() {
1213                if let Some(metadata) = &chunk.metadata {
1214                    node = Some(metadata.hostname.clone());
1215                }
1216            }
1217            data.extend(chunk.bytes);
1218        }
1219
1220        Ok(ReadResponse::new(data, node))
1221    }
1222
1223    /// Copy a file or directory as tar archive (server-streaming).
1224    pub async fn copy(&self, request: CopyRequest) -> Result<CopyResponse> {
1225        use tonic::codegen::tokio_stream::StreamExt;
1226
1227        let mut client = MachineServiceClient::new(self.channel.clone());
1228
1229        let proto_request: ProtoCopyRequest = request.into();
1230        let response = client.copy(proto_request).await?;
1231        let mut stream = response.into_inner();
1232
1233        let mut data = Vec::new();
1234        let mut node = None;
1235
1236        while let Some(chunk) = stream.next().await {
1237            let chunk = chunk?;
1238            if node.is_none() {
1239                if let Some(metadata) = &chunk.metadata {
1240                    node = Some(metadata.hostname.clone());
1241                }
1242            }
1243            data.extend(chunk.bytes);
1244        }
1245
1246        Ok(CopyResponse::new(data, node))
1247    }
1248
1249    /// Get disk usage (server-streaming).
1250    pub async fn disk_usage(&self, request: DiskUsageRequest) -> Result<DiskUsageResponse> {
1251        use tonic::codegen::tokio_stream::StreamExt;
1252
1253        let mut client = MachineServiceClient::new(self.channel.clone());
1254
1255        let proto_request: ProtoDiskUsageRequest = request.into();
1256        let response = client.disk_usage(proto_request).await?;
1257        let mut stream = response.into_inner();
1258
1259        let mut entries = Vec::new();
1260        while let Some(info) = stream.next().await {
1261            let info = info?;
1262            entries.push(DiskUsageInfo::from(info));
1263        }
1264
1265        Ok(DiskUsageResponse::new(entries))
1266    }
1267
1268    // =========================================================================
1269    // Advanced APIs
1270    // =========================================================================
1271
1272    /// Rollback a Talos node to the previous installed version.
1273    pub async fn rollback(&self) -> Result<RollbackResponse> {
1274        let mut client = MachineServiceClient::new(self.channel.clone());
1275
1276        let response = client.rollback(ProtoRollbackRequest {}).await?;
1277        let inner = response.into_inner();
1278
1279        Ok(RollbackResponse::from(inner))
1280    }
1281
1282    /// Generate client configuration (talosconfig).
1283    pub async fn generate_client_configuration(
1284        &self,
1285        request: GenerateClientConfigurationRequest,
1286    ) -> Result<GenerateClientConfigurationResponse> {
1287        let mut client = MachineServiceClient::new(self.channel.clone());
1288
1289        let proto_request: ProtoGenerateClientConfigRequest = request.into();
1290        let response = client.generate_client_configuration(proto_request).await?;
1291        let inner = response.into_inner();
1292
1293        Ok(GenerateClientConfigurationResponse::from(inner))
1294    }
1295
1296    /// Capture packets on a network interface (server-streaming).
1297    pub async fn packet_capture(
1298        &self,
1299        request: PacketCaptureRequest,
1300    ) -> Result<PacketCaptureResponse> {
1301        use tonic::codegen::tokio_stream::StreamExt;
1302
1303        let mut client = MachineServiceClient::new(self.channel.clone());
1304
1305        let proto_request: ProtoPacketCaptureRequest = request.into();
1306        let response = client.packet_capture(proto_request).await?;
1307        let mut stream = response.into_inner();
1308
1309        let mut data = Vec::new();
1310        let mut node = None;
1311
1312        while let Some(chunk) = stream.next().await {
1313            let chunk = chunk?;
1314            if node.is_none() {
1315                if let Some(metadata) = &chunk.metadata {
1316                    node = Some(metadata.hostname.clone());
1317                }
1318            }
1319            data.extend(chunk.bytes);
1320        }
1321
1322        Ok(PacketCaptureResponse::new(data, node))
1323    }
1324
1325    /// Get network connection information (netstat).
1326    pub async fn netstat(&self, request: NetstatRequest) -> Result<NetstatResponse> {
1327        let mut client = MachineServiceClient::new(self.channel.clone());
1328
1329        let proto_request: ProtoNetstatRequest = request.into();
1330        let response = client.netstat(proto_request).await?;
1331        let inner = response.into_inner();
1332
1333        Ok(NetstatResponse::from(inner))
1334    }
1335}
1336
1337// Helper for insecure mode
1338#[derive(Debug)]
1339struct NoVerifier;
1340
1341impl rustls::client::danger::ServerCertVerifier for NoVerifier {
1342    fn verify_server_cert(
1343        &self,
1344        _end_entity: &rustls::pki_types::CertificateDer<'_>,
1345        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
1346        _server_name: &rustls::pki_types::ServerName<'_>,
1347        _ocsp_response: &[u8],
1348        _now: rustls::pki_types::UnixTime,
1349    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1350        Ok(rustls::client::danger::ServerCertVerified::assertion())
1351    }
1352
1353    fn verify_tls12_signature(
1354        &self,
1355        _message: &[u8],
1356        _cert: &rustls::pki_types::CertificateDer<'_>,
1357        _dss: &rustls::DigitallySignedStruct,
1358    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1359        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1360    }
1361
1362    fn verify_tls13_signature(
1363        &self,
1364        _message: &[u8],
1365        _cert: &rustls::pki_types::CertificateDer<'_>,
1366        _dss: &rustls::DigitallySignedStruct,
1367    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1368        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1369    }
1370
1371    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1372        vec![
1373            rustls::SignatureScheme::RSA_PKCS1_SHA1,
1374            rustls::SignatureScheme::ECDSA_SHA1_Legacy,
1375            rustls::SignatureScheme::RSA_PKCS1_SHA256,
1376            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1377            rustls::SignatureScheme::RSA_PKCS1_SHA384,
1378            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
1379            rustls::SignatureScheme::RSA_PKCS1_SHA512,
1380            rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
1381            rustls::SignatureScheme::RSA_PSS_SHA256,
1382            rustls::SignatureScheme::RSA_PSS_SHA384,
1383            rustls::SignatureScheme::RSA_PSS_SHA512,
1384            rustls::SignatureScheme::ED25519,
1385            rustls::SignatureScheme::ED448,
1386        ]
1387    }
1388}
1389
1390mod pool;
1391
1392pub use pool::{ConnectionPool, ConnectionPoolConfig, EndpointHealth, HealthStatus, LoadBalancer};
1393
1394#[cfg(test)]
1395mod tests;