1use crate::api::machine::machine_service_client::MachineServiceClient;
4use crate::api::machine::ApplyConfigurationRequest as ProtoApplyConfigRequest;
5use crate::api::machine::BootstrapRequest as ProtoBootstrapRequest;
6use crate::api::machine::CopyRequest as ProtoCopyRequest;
7use crate::api::machine::DiskUsageRequest as ProtoDiskUsageRequest;
8use crate::api::machine::DmesgRequest as ProtoDmesgRequest;
9use crate::api::machine::EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest;
10use crate::api::machine::EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest;
11use crate::api::machine::EtcdMemberListRequest as ProtoEtcdMemberListRequest;
12use crate::api::machine::EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest;
13use crate::api::machine::GenerateClientConfigurationRequest as ProtoGenerateClientConfigRequest;
14use crate::api::machine::ListRequest as ProtoListRequest;
15use crate::api::machine::LogsRequest as ProtoLogsRequest;
16use crate::api::machine::NetstatRequest as ProtoNetstatRequest;
17use crate::api::machine::PacketCaptureRequest as ProtoPacketCaptureRequest;
18use crate::api::machine::ReadRequest as ProtoReadRequest;
19use crate::api::machine::ResetRequest as ProtoResetRequest;
20use crate::api::machine::RollbackRequest as ProtoRollbackRequest;
21use crate::api::machine::ServiceRestartRequest as ProtoServiceRestartRequest;
22use crate::api::machine::ServiceStartRequest as ProtoServiceStartRequest;
23use crate::api::machine::ServiceStopRequest as ProtoServiceStopRequest;
24use crate::api::machine::UpgradeRequest as ProtoUpgradeRequest;
25use crate::api::version::version_service_client::VersionServiceClient;
26use crate::error::Result;
27use crate::resources::{
28 ApplyConfigurationRequest, ApplyConfigurationResponse, BootstrapRequest, BootstrapResponse,
29 CopyRequest, CopyResponse, CpuInfoResponse, DiskStatsResponse, DiskUsageInfo, DiskUsageRequest,
30 DiskUsageResponse, DmesgRequest, DmesgResponse, EtcdAlarmDisarmResponse, EtcdAlarmListResponse,
31 EtcdDefragmentResponse, EtcdForfeitLeadershipRequest, EtcdForfeitLeadershipResponse,
32 EtcdLeaveClusterRequest, EtcdLeaveClusterResponse, EtcdMemberListRequest,
33 EtcdMemberListResponse, EtcdRemoveMemberByIdRequest, EtcdRemoveMemberByIdResponse,
34 EtcdStatusResponse, FileInfo, GenerateClientConfigurationRequest,
35 GenerateClientConfigurationResponse, KubeconfigResponse, ListRequest, ListResponse,
36 LoadAvgResponse, LogsRequest, LogsResponse, MemoryResponse, MountsResponse, NetstatRequest,
37 NetstatResponse, NetworkDeviceStatsResponse, PacketCaptureRequest, PacketCaptureResponse,
38 ProcessesResponse, ReadRequest, ReadResponse, ResetRequest, ResetResponse, RollbackResponse,
39 ServiceRestartRequest, ServiceRestartResponse, ServiceStartRequest, ServiceStartResponse,
40 ServiceStopRequest, ServiceStopResponse, UpgradeRequest, UpgradeResponse,
41};
42use hyper_util::rt::TokioIo;
43use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
44use std::sync::Arc;
45use std::time::Duration;
46use tonic::transport::{Channel, Endpoint};
47
48#[derive(Clone, Debug)]
50pub struct TalosClientConfig {
51 pub endpoint: String,
53 pub crt_path: Option<String>,
55 pub key_path: Option<String>,
57 pub ca_path: Option<String>,
59 pub insecure: bool,
61 pub connect_timeout: Option<Duration>,
63 pub request_timeout: Option<Duration>,
65 pub keepalive_interval: Option<Duration>,
67 pub keepalive_timeout: Option<Duration>,
69}
70
71impl Default for TalosClientConfig {
72 fn default() -> Self {
73 Self {
74 endpoint: "https://127.0.0.1:50000".to_string(),
75 crt_path: None,
76 key_path: None,
77 ca_path: None,
78 insecure: false,
79 connect_timeout: Some(Duration::from_secs(10)),
80 request_timeout: Some(Duration::from_secs(30)),
81 keepalive_interval: Some(Duration::from_secs(30)),
82 keepalive_timeout: Some(Duration::from_secs(10)),
83 }
84 }
85}
86
87impl TalosClientConfig {
88 #[must_use]
90 pub fn new(endpoint: impl Into<String>) -> Self {
91 Self {
92 endpoint: endpoint.into(),
93 ..Default::default()
94 }
95 }
96
97 #[must_use]
99 pub fn builder(endpoint: impl Into<String>) -> TalosClientConfigBuilder {
100 TalosClientConfigBuilder::new(endpoint)
101 }
102
103 #[must_use]
105 pub fn with_client_cert(mut self, crt_path: impl Into<String>) -> Self {
106 self.crt_path = Some(crt_path.into());
107 self
108 }
109
110 #[must_use]
112 pub fn with_client_key(mut self, key_path: impl Into<String>) -> Self {
113 self.key_path = Some(key_path.into());
114 self
115 }
116
117 #[must_use]
119 pub fn with_ca(mut self, ca_path: impl Into<String>) -> Self {
120 self.ca_path = Some(ca_path.into());
121 self
122 }
123
124 #[must_use]
126 pub fn insecure(mut self) -> Self {
127 self.insecure = true;
128 self
129 }
130
131 #[must_use]
133 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
134 self.connect_timeout = Some(timeout);
135 self
136 }
137
138 #[must_use]
140 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
141 self.request_timeout = Some(timeout);
142 self
143 }
144
145 #[must_use]
147 pub fn no_timeout(mut self) -> Self {
148 self.connect_timeout = None;
149 self.request_timeout = None;
150 self
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct TalosClientConfigBuilder {
157 endpoint: String,
158 crt_path: Option<String>,
159 key_path: Option<String>,
160 ca_path: Option<String>,
161 insecure: bool,
162 connect_timeout: Option<Duration>,
163 request_timeout: Option<Duration>,
164 keepalive_interval: Option<Duration>,
165 keepalive_timeout: Option<Duration>,
166}
167
168impl TalosClientConfigBuilder {
169 #[must_use]
171 pub fn new(endpoint: impl Into<String>) -> Self {
172 Self {
173 endpoint: endpoint.into(),
174 crt_path: None,
175 key_path: None,
176 ca_path: None,
177 insecure: false,
178 connect_timeout: Some(Duration::from_secs(10)),
179 request_timeout: Some(Duration::from_secs(30)),
180 keepalive_interval: Some(Duration::from_secs(30)),
181 keepalive_timeout: Some(Duration::from_secs(10)),
182 }
183 }
184
185 #[must_use]
187 pub fn client_cert(mut self, path: impl Into<String>) -> Self {
188 self.crt_path = Some(path.into());
189 self
190 }
191
192 #[must_use]
194 pub fn client_key(mut self, path: impl Into<String>) -> Self {
195 self.key_path = Some(path.into());
196 self
197 }
198
199 #[must_use]
201 pub fn ca_cert(mut self, path: impl Into<String>) -> Self {
202 self.ca_path = Some(path.into());
203 self
204 }
205
206 #[must_use]
208 pub fn insecure(mut self) -> Self {
209 self.insecure = true;
210 self
211 }
212
213 #[must_use]
215 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
216 self.connect_timeout = Some(timeout);
217 self
218 }
219
220 #[must_use]
222 pub fn request_timeout(mut self, timeout: Duration) -> Self {
223 self.request_timeout = Some(timeout);
224 self
225 }
226
227 #[must_use]
229 pub fn keepalive(mut self, interval: Duration, timeout: Duration) -> Self {
230 self.keepalive_interval = Some(interval);
231 self.keepalive_timeout = Some(timeout);
232 self
233 }
234
235 #[must_use]
237 pub fn no_timeout(mut self) -> Self {
238 self.connect_timeout = None;
239 self.request_timeout = None;
240 self
241 }
242
243 #[must_use]
245 pub fn build(self) -> TalosClientConfig {
246 TalosClientConfig {
247 endpoint: self.endpoint,
248 crt_path: self.crt_path,
249 key_path: self.key_path,
250 ca_path: self.ca_path,
251 insecure: self.insecure,
252 connect_timeout: self.connect_timeout,
253 request_timeout: self.request_timeout,
254 keepalive_interval: self.keepalive_interval,
255 keepalive_timeout: self.keepalive_timeout,
256 }
257 }
258}
259
260#[derive(Clone)]
261pub struct TalosClient {
262 #[allow(dead_code)] config: TalosClientConfig,
264 channel: Channel,
265}
266
267impl TalosClient {
268 pub async fn new(config: TalosClientConfig) -> Result<Self> {
269 let _ = rustls::crypto::ring::default_provider().install_default();
271
272 let is_http = config.endpoint.starts_with("http://");
274
275 let channel = if is_http {
276 Self::create_http_channel(&config).await?
278 } else if config.insecure {
279 Self::create_insecure_channel(&config).await?
280 } else {
281 Self::create_mtls_channel(&config).await?
282 };
283
284 Ok(Self { config, channel })
285 }
286
287 async fn create_http_channel(config: &TalosClientConfig) -> Result<Channel> {
289 let mut endpoint = Channel::from_shared(config.endpoint.clone())
290 .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
291
292 if let Some(timeout) = config.connect_timeout {
294 endpoint = endpoint.connect_timeout(timeout);
295 }
296 if let Some(timeout) = config.request_timeout {
297 endpoint = endpoint.timeout(timeout);
298 }
299 if let Some(interval) = config.keepalive_interval {
300 if let Some(ka_timeout) = config.keepalive_timeout {
301 endpoint = endpoint
302 .http2_keep_alive_interval(interval)
303 .keep_alive_timeout(ka_timeout);
304 }
305 }
306
307 let channel = endpoint.connect().await?;
308 Ok(channel)
309 }
310
311 async fn create_insecure_channel(config: &TalosClientConfig) -> Result<Channel> {
313 let tls_config = rustls::ClientConfig::builder()
314 .with_root_certificates(rustls::RootCertStore::empty())
315 .with_no_client_auth();
316
317 Self::connect_with_custom_tls(config, tls_config, true).await
318 }
319
320 async fn create_mtls_channel(config: &TalosClientConfig) -> Result<Channel> {
322 let root_store = if let Some(ca_path) = &config.ca_path {
324 let ca_pem = std::fs::read(ca_path).map_err(|e| {
325 crate::error::TalosError::Config(format!("Failed to read CA cert: {e}"))
326 })?;
327 let mut root_store = rustls::RootCertStore::empty();
328 let certs = Self::load_pem_certs(&ca_pem)?;
329 for cert in certs {
330 root_store.add(cert).map_err(|e| {
331 crate::error::TalosError::Config(format!("Failed to add CA cert: {e}"))
332 })?;
333 }
334 root_store
335 } else {
336 let mut root_store = rustls::RootCertStore::empty();
338 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
339 root_store
340 };
341
342 let tls_config =
344 if let (Some(crt_path), Some(key_path)) = (&config.crt_path, &config.key_path) {
345 let cert_pem = std::fs::read(crt_path).map_err(|e| {
347 crate::error::TalosError::Config(format!("Failed to read client cert: {e}"))
348 })?;
349 let key_pem = std::fs::read(key_path).map_err(|e| {
350 crate::error::TalosError::Config(format!("Failed to read client key: {e}"))
351 })?;
352
353 let client_certs = Self::load_pem_certs(&cert_pem)?;
354 let client_key = Self::load_pem_key(&key_pem)?;
355
356 rustls::ClientConfig::builder()
357 .with_root_certificates(root_store)
358 .with_client_auth_cert(client_certs, client_key)
359 .map_err(|e| {
360 crate::error::TalosError::Config(format!(
361 "Failed to configure client auth: {e}"
362 ))
363 })?
364 } else {
365 rustls::ClientConfig::builder()
367 .with_root_certificates(root_store)
368 .with_no_client_auth()
369 };
370
371 Self::connect_with_custom_tls(config, tls_config, false).await
372 }
373
374 async fn connect_with_custom_tls(
376 config: &TalosClientConfig,
377 mut tls_config: rustls::ClientConfig,
378 skip_verification: bool,
379 ) -> Result<Channel> {
380 if skip_verification {
382 tls_config
383 .dangerous()
384 .set_certificate_verifier(Arc::new(NoVerifier));
385 }
386
387 tls_config.alpn_protocols = vec![b"h2".to_vec()];
389 let tls_config = Arc::new(tls_config);
390 let connector = tokio_rustls::TlsConnector::from(tls_config);
391
392 let endpoint_url = if config.endpoint.starts_with("http") {
394 config.endpoint.clone()
395 } else {
396 format!("https://{}", config.endpoint)
397 };
398 let parsed_url = url::Url::parse(&endpoint_url)
399 .map_err(|e| crate::error::TalosError::Config(format!("Invalid endpoint URL: {e}")))?;
400 let host = parsed_url
401 .host_str()
402 .ok_or_else(|| crate::error::TalosError::Config("No host in endpoint".to_string()))?
403 .to_string();
404 let port = parsed_url.port().unwrap_or(50000);
405
406 let endpoint_for_connector = format!("http://{}:{}", host, port);
408
409 let mut endpoint = Endpoint::from_shared(endpoint_for_connector)
411 .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
412
413 if let Some(timeout) = config.connect_timeout {
415 endpoint = endpoint.connect_timeout(timeout);
416 }
417 if let Some(timeout) = config.request_timeout {
418 endpoint = endpoint.timeout(timeout);
419 }
420 if let Some(interval) = config.keepalive_interval {
421 if let Some(ka_timeout) = config.keepalive_timeout {
422 endpoint = endpoint
423 .http2_keep_alive_interval(interval)
424 .keep_alive_timeout(ka_timeout);
425 }
426 }
427
428 let channel = endpoint
429 .connect_with_connector(tower::service_fn(move |uri: tonic::transport::Uri| {
430 let connector = connector.clone();
431 let host = host.clone();
432 async move {
433 let uri_host = uri.host().unwrap_or("127.0.0.1");
434 let uri_port = uri.port_u16().unwrap_or(50000);
435 let addr = format!("{}:{}", uri_host, uri_port);
436
437 let tcp = tokio::net::TcpStream::connect(addr).await?;
438
439 let server_name = ServerName::try_from(host.clone())
441 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
442
443 let tls_stream = connector.connect(server_name, tcp).await?;
444 Ok::<_, std::io::Error>(TokioIo::new(tls_stream))
445 }
446 }))
447 .await?;
448
449 Ok(channel)
450 }
451
452 #[allow(clippy::result_large_err)]
454 fn load_pem_certs(pem_data: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
455 let mut reader = std::io::BufReader::new(pem_data);
456 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
457 .collect::<std::result::Result<Vec<_>, _>>()
458 .map_err(|e| {
459 crate::error::TalosError::Config(format!("Failed to parse PEM certificates: {e}"))
460 })?;
461 if certs.is_empty() {
462 return Err(crate::error::TalosError::Config(
463 "No certificates found in PEM data".to_string(),
464 ));
465 }
466 Ok(certs)
467 }
468
469 #[allow(clippy::result_large_err)]
471 fn load_pem_key(pem_data: &[u8]) -> Result<PrivateKeyDer<'static>> {
472 let mut reader = std::io::BufReader::new(pem_data);
474
475 loop {
476 match rustls_pemfile::read_one(&mut reader) {
477 Ok(Some(rustls_pemfile::Item::Pkcs1Key(key))) => {
478 return Ok(PrivateKeyDer::Pkcs1(key));
479 }
480 Ok(Some(rustls_pemfile::Item::Pkcs8Key(key))) => {
481 return Ok(PrivateKeyDer::Pkcs8(key));
482 }
483 Ok(Some(rustls_pemfile::Item::Sec1Key(key))) => {
484 return Ok(PrivateKeyDer::Sec1(key));
485 }
486 Ok(Some(_)) => {
487 continue;
489 }
490 Ok(None) => {
491 break;
492 }
493 Err(e) => {
494 return Err(crate::error::TalosError::Config(format!(
495 "Failed to parse PEM key: {e}"
496 )));
497 }
498 }
499 }
500
501 let pem_str = std::str::from_utf8(pem_data)
504 .map_err(|e| crate::error::TalosError::Config(format!("Invalid UTF-8 in key: {e}")))?;
505
506 if pem_str.contains("-----BEGIN ED25519 PRIVATE KEY-----") {
507 let start_marker = "-----BEGIN ED25519 PRIVATE KEY-----";
509 let end_marker = "-----END ED25519 PRIVATE KEY-----";
510
511 if let Some(start) = pem_str.find(start_marker) {
512 if let Some(end) = pem_str.find(end_marker) {
513 let base64_content = &pem_str[start + start_marker.len()..end];
514 let base64_clean: String = base64_content
515 .chars()
516 .filter(|c| !c.is_whitespace())
517 .collect();
518
519 let der_bytes = base64::Engine::decode(
520 &base64::engine::general_purpose::STANDARD,
521 &base64_clean,
522 )
523 .map_err(|e| {
524 crate::error::TalosError::Config(format!(
525 "Failed to decode ED25519 key: {e}"
526 ))
527 })?;
528
529 return Ok(PrivateKeyDer::Pkcs8(
531 rustls::pki_types::PrivatePkcs8KeyDer::from(der_bytes),
532 ));
533 }
534 }
535 }
536
537 Err(crate::error::TalosError::Config(
538 "No private key found in PEM data".to_string(),
539 ))
540 }
541
542 pub fn version(&self) -> VersionServiceClient<Channel> {
544 VersionServiceClient::new(self.channel.clone())
545 }
546
547 pub fn machine(&self) -> MachineServiceClient<Channel> {
549 MachineServiceClient::new(self.channel.clone())
550 }
551
552 pub async fn apply_configuration(
589 &self,
590 request: ApplyConfigurationRequest,
591 ) -> Result<ApplyConfigurationResponse> {
592 let proto_request: ProtoApplyConfigRequest = request.into();
593 let response = self
594 .machine()
595 .apply_configuration(proto_request)
596 .await?
597 .into_inner();
598 Ok(response.into())
599 }
600
601 pub async fn apply_configuration_yaml(
618 &self,
619 yaml: &str,
620 mode: crate::ApplyMode,
621 dry_run: bool,
622 ) -> Result<ApplyConfigurationResponse> {
623 let request = ApplyConfigurationRequest::builder()
624 .config_yaml(yaml)
625 .mode(mode)
626 .dry_run(dry_run)
627 .build();
628 self.apply_configuration(request).await
629 }
630
631 pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<BootstrapResponse> {
672 let proto_request: ProtoBootstrapRequest = request.into();
673 let response = self.machine().bootstrap(proto_request).await?.into_inner();
674 Ok(response.into())
675 }
676
677 pub async fn bootstrap_cluster(&self) -> Result<BootstrapResponse> {
681 self.bootstrap(BootstrapRequest::new()).await
682 }
683
684 pub async fn kubeconfig(&self) -> Result<KubeconfigResponse> {
719 use tonic::codegen::tokio_stream::StreamExt;
720
721 let mut stream = self.machine().kubeconfig(()).await?.into_inner();
722
723 let mut data = Vec::new();
724 let mut node = None;
725
726 while let Some(chunk) = stream.next().await {
727 let chunk = chunk?;
728 if node.is_none() {
730 if let Some(metadata) = &chunk.metadata {
731 node = Some(metadata.hostname.clone());
732 }
733 }
734 data.extend(chunk.bytes);
735 }
736
737 Ok(KubeconfigResponse::new(data, node))
738 }
739
740 pub async fn reset(&self, request: ResetRequest) -> Result<ResetResponse> {
777 let mut client = MachineServiceClient::new(self.channel.clone());
778
779 let proto_request: ProtoResetRequest = request.into();
780 let response = client.reset(proto_request).await?;
781 let inner = response.into_inner();
782
783 Ok(ResetResponse::from(inner))
784 }
785
786 pub async fn reset_graceful(&self) -> Result<ResetResponse> {
795 self.reset(ResetRequest::graceful()).await
796 }
797
798 pub async fn etcd_member_list(
821 &self,
822 request: EtcdMemberListRequest,
823 ) -> Result<EtcdMemberListResponse> {
824 let mut client = MachineServiceClient::new(self.channel.clone());
825
826 let proto_request: ProtoEtcdMemberListRequest = request.into();
827 let response = client.etcd_member_list(proto_request).await?;
828 let inner = response.into_inner();
829
830 Ok(EtcdMemberListResponse::from(inner))
831 }
832
833 pub async fn etcd_remove_member_by_id(
858 &self,
859 request: EtcdRemoveMemberByIdRequest,
860 ) -> Result<EtcdRemoveMemberByIdResponse> {
861 let mut client = MachineServiceClient::new(self.channel.clone());
862
863 let proto_request: ProtoEtcdRemoveMemberByIdRequest = request.into();
864 let response = client.etcd_remove_member_by_id(proto_request).await?;
865 let inner = response.into_inner();
866
867 Ok(EtcdRemoveMemberByIdResponse::from(inner))
868 }
869
870 pub async fn etcd_leave_cluster(
874 &self,
875 request: EtcdLeaveClusterRequest,
876 ) -> Result<EtcdLeaveClusterResponse> {
877 let mut client = MachineServiceClient::new(self.channel.clone());
878
879 let proto_request: ProtoEtcdLeaveClusterRequest = request.into();
880 let response = client.etcd_leave_cluster(proto_request).await?;
881 let inner = response.into_inner();
882
883 Ok(EtcdLeaveClusterResponse::from(inner))
884 }
885
886 pub async fn etcd_forfeit_leadership(
890 &self,
891 request: EtcdForfeitLeadershipRequest,
892 ) -> Result<EtcdForfeitLeadershipResponse> {
893 let mut client = MachineServiceClient::new(self.channel.clone());
894
895 let proto_request: ProtoEtcdForfeitLeadershipRequest = request.into();
896 let response = client.etcd_forfeit_leadership(proto_request).await?;
897 let inner = response.into_inner();
898
899 Ok(EtcdForfeitLeadershipResponse::from(inner))
900 }
901
902 pub async fn etcd_status(&self) -> Result<EtcdStatusResponse> {
904 let mut client = MachineServiceClient::new(self.channel.clone());
905
906 let response = client.etcd_status(()).await?;
907 let inner = response.into_inner();
908
909 Ok(EtcdStatusResponse::from(inner))
910 }
911
912 pub async fn etcd_alarm_list(&self) -> Result<EtcdAlarmListResponse> {
914 let mut client = MachineServiceClient::new(self.channel.clone());
915
916 let response = client.etcd_alarm_list(()).await?;
917 let inner = response.into_inner();
918
919 Ok(EtcdAlarmListResponse::from(inner))
920 }
921
922 pub async fn etcd_alarm_disarm(&self) -> Result<EtcdAlarmDisarmResponse> {
924 let mut client = MachineServiceClient::new(self.channel.clone());
925
926 let response = client.etcd_alarm_disarm(()).await?;
927 let inner = response.into_inner();
928
929 Ok(EtcdAlarmDisarmResponse::from(inner))
930 }
931
932 pub async fn etcd_defragment(&self) -> Result<EtcdDefragmentResponse> {
936 let mut client = MachineServiceClient::new(self.channel.clone());
937
938 let response = client.etcd_defragment(()).await?;
939 let inner = response.into_inner();
940
941 Ok(EtcdDefragmentResponse::from(inner))
942 }
943
944 pub async fn dmesg(&self, request: DmesgRequest) -> Result<DmesgResponse> {
967 use tonic::codegen::tokio_stream::StreamExt;
968
969 let mut client = MachineServiceClient::new(self.channel.clone());
970
971 let proto_request: ProtoDmesgRequest = request.into();
972 let response = client.dmesg(proto_request).await?;
973 let mut stream = response.into_inner();
974
975 let mut data = Vec::new();
976 let mut node = None;
977
978 while let Some(chunk) = stream.next().await {
979 let chunk = chunk?;
980 if node.is_none() {
981 if let Some(metadata) = &chunk.metadata {
982 node = Some(metadata.hostname.clone());
983 }
984 }
985 data.extend(chunk.bytes);
986 }
987
988 Ok(DmesgResponse::new(data, node))
989 }
990
991 pub async fn upgrade(&self, request: UpgradeRequest) -> Result<UpgradeResponse> {
1022 let mut client = MachineServiceClient::new(self.channel.clone());
1023
1024 let proto_request: ProtoUpgradeRequest = request.into();
1025 let response = client.upgrade(proto_request).await?;
1026 let inner = response.into_inner();
1027
1028 Ok(UpgradeResponse::from(inner))
1029 }
1030
1031 pub async fn service_start(
1037 &self,
1038 request: ServiceStartRequest,
1039 ) -> Result<ServiceStartResponse> {
1040 let mut client = MachineServiceClient::new(self.channel.clone());
1041
1042 let proto_request: ProtoServiceStartRequest = request.into();
1043 let response = client.service_start(proto_request).await?;
1044 let inner = response.into_inner();
1045
1046 Ok(ServiceStartResponse::from(inner))
1047 }
1048
1049 pub async fn service_stop(&self, request: ServiceStopRequest) -> Result<ServiceStopResponse> {
1051 let mut client = MachineServiceClient::new(self.channel.clone());
1052
1053 let proto_request: ProtoServiceStopRequest = request.into();
1054 let response = client.service_stop(proto_request).await?;
1055 let inner = response.into_inner();
1056
1057 Ok(ServiceStopResponse::from(inner))
1058 }
1059
1060 pub async fn service_restart(
1062 &self,
1063 request: ServiceRestartRequest,
1064 ) -> Result<ServiceRestartResponse> {
1065 let mut client = MachineServiceClient::new(self.channel.clone());
1066
1067 let proto_request: ProtoServiceRestartRequest = request.into();
1068 let response = client.service_restart(proto_request).await?;
1069 let inner = response.into_inner();
1070
1071 Ok(ServiceRestartResponse::from(inner))
1072 }
1073
1074 pub async fn logs(&self, request: LogsRequest) -> Result<LogsResponse> {
1076 use tonic::codegen::tokio_stream::StreamExt;
1077
1078 let mut client = MachineServiceClient::new(self.channel.clone());
1079
1080 let proto_request: ProtoLogsRequest = request.into();
1081 let response = client.logs(proto_request).await?;
1082 let mut stream = response.into_inner();
1083
1084 let mut data = Vec::new();
1085 let mut node = None;
1086
1087 while let Some(chunk) = stream.next().await {
1088 let chunk = chunk?;
1089 if node.is_none() {
1090 if let Some(metadata) = &chunk.metadata {
1091 node = Some(metadata.hostname.clone());
1092 }
1093 }
1094 data.extend(chunk.bytes);
1095 }
1096
1097 Ok(LogsResponse::new(data, node))
1098 }
1099
1100 pub async fn load_avg(&self) -> Result<LoadAvgResponse> {
1106 let mut client = MachineServiceClient::new(self.channel.clone());
1107
1108 let response = client.load_avg(()).await?;
1109 let inner = response.into_inner();
1110
1111 Ok(LoadAvgResponse::from(inner))
1112 }
1113
1114 pub async fn memory(&self) -> Result<MemoryResponse> {
1116 let mut client = MachineServiceClient::new(self.channel.clone());
1117
1118 let response = client.memory(()).await?;
1119 let inner = response.into_inner();
1120
1121 Ok(MemoryResponse::from(inner))
1122 }
1123
1124 pub async fn cpu_info(&self) -> Result<CpuInfoResponse> {
1126 let mut client = MachineServiceClient::new(self.channel.clone());
1127
1128 let response = client.cpu_info(()).await?;
1129 let inner = response.into_inner();
1130
1131 Ok(CpuInfoResponse::from(inner))
1132 }
1133
1134 pub async fn disk_stats(&self) -> Result<DiskStatsResponse> {
1136 let mut client = MachineServiceClient::new(self.channel.clone());
1137
1138 let response = client.disk_stats(()).await?;
1139 let inner = response.into_inner();
1140
1141 Ok(DiskStatsResponse::from(inner))
1142 }
1143
1144 pub async fn network_device_stats(&self) -> Result<NetworkDeviceStatsResponse> {
1146 let mut client = MachineServiceClient::new(self.channel.clone());
1147
1148 let response = client.network_device_stats(()).await?;
1149 let inner = response.into_inner();
1150
1151 Ok(NetworkDeviceStatsResponse::from(inner))
1152 }
1153
1154 pub async fn mounts(&self) -> Result<MountsResponse> {
1156 let mut client = MachineServiceClient::new(self.channel.clone());
1157
1158 let response = client.mounts(()).await?;
1159 let inner = response.into_inner();
1160
1161 Ok(MountsResponse::from(inner))
1162 }
1163
1164 pub async fn processes(&self) -> Result<ProcessesResponse> {
1166 let mut client = MachineServiceClient::new(self.channel.clone());
1167
1168 let response = client.processes(()).await?;
1169 let inner = response.into_inner();
1170
1171 Ok(ProcessesResponse::from(inner))
1172 }
1173
1174 pub async fn list(&self, request: ListRequest) -> Result<ListResponse> {
1180 use tonic::codegen::tokio_stream::StreamExt;
1181
1182 let mut client = MachineServiceClient::new(self.channel.clone());
1183
1184 let proto_request: ProtoListRequest = request.into();
1185 let response = client.list(proto_request).await?;
1186 let mut stream = response.into_inner();
1187
1188 let mut entries = Vec::new();
1189 while let Some(info) = stream.next().await {
1190 let info = info?;
1191 entries.push(FileInfo::from(info));
1192 }
1193
1194 Ok(ListResponse::new(entries))
1195 }
1196
1197 pub async fn read(&self, request: ReadRequest) -> Result<ReadResponse> {
1199 use tonic::codegen::tokio_stream::StreamExt;
1200
1201 let mut client = MachineServiceClient::new(self.channel.clone());
1202
1203 let proto_request: ProtoReadRequest = request.into();
1204 let response = client.read(proto_request).await?;
1205 let mut stream = response.into_inner();
1206
1207 let mut data = Vec::new();
1208 let mut node = None;
1209
1210 while let Some(chunk) = stream.next().await {
1211 let chunk = chunk?;
1212 if node.is_none() {
1213 if let Some(metadata) = &chunk.metadata {
1214 node = Some(metadata.hostname.clone());
1215 }
1216 }
1217 data.extend(chunk.bytes);
1218 }
1219
1220 Ok(ReadResponse::new(data, node))
1221 }
1222
1223 pub async fn copy(&self, request: CopyRequest) -> Result<CopyResponse> {
1225 use tonic::codegen::tokio_stream::StreamExt;
1226
1227 let mut client = MachineServiceClient::new(self.channel.clone());
1228
1229 let proto_request: ProtoCopyRequest = request.into();
1230 let response = client.copy(proto_request).await?;
1231 let mut stream = response.into_inner();
1232
1233 let mut data = Vec::new();
1234 let mut node = None;
1235
1236 while let Some(chunk) = stream.next().await {
1237 let chunk = chunk?;
1238 if node.is_none() {
1239 if let Some(metadata) = &chunk.metadata {
1240 node = Some(metadata.hostname.clone());
1241 }
1242 }
1243 data.extend(chunk.bytes);
1244 }
1245
1246 Ok(CopyResponse::new(data, node))
1247 }
1248
1249 pub async fn disk_usage(&self, request: DiskUsageRequest) -> Result<DiskUsageResponse> {
1251 use tonic::codegen::tokio_stream::StreamExt;
1252
1253 let mut client = MachineServiceClient::new(self.channel.clone());
1254
1255 let proto_request: ProtoDiskUsageRequest = request.into();
1256 let response = client.disk_usage(proto_request).await?;
1257 let mut stream = response.into_inner();
1258
1259 let mut entries = Vec::new();
1260 while let Some(info) = stream.next().await {
1261 let info = info?;
1262 entries.push(DiskUsageInfo::from(info));
1263 }
1264
1265 Ok(DiskUsageResponse::new(entries))
1266 }
1267
1268 pub async fn rollback(&self) -> Result<RollbackResponse> {
1274 let mut client = MachineServiceClient::new(self.channel.clone());
1275
1276 let response = client.rollback(ProtoRollbackRequest {}).await?;
1277 let inner = response.into_inner();
1278
1279 Ok(RollbackResponse::from(inner))
1280 }
1281
1282 pub async fn generate_client_configuration(
1284 &self,
1285 request: GenerateClientConfigurationRequest,
1286 ) -> Result<GenerateClientConfigurationResponse> {
1287 let mut client = MachineServiceClient::new(self.channel.clone());
1288
1289 let proto_request: ProtoGenerateClientConfigRequest = request.into();
1290 let response = client.generate_client_configuration(proto_request).await?;
1291 let inner = response.into_inner();
1292
1293 Ok(GenerateClientConfigurationResponse::from(inner))
1294 }
1295
1296 pub async fn packet_capture(
1298 &self,
1299 request: PacketCaptureRequest,
1300 ) -> Result<PacketCaptureResponse> {
1301 use tonic::codegen::tokio_stream::StreamExt;
1302
1303 let mut client = MachineServiceClient::new(self.channel.clone());
1304
1305 let proto_request: ProtoPacketCaptureRequest = request.into();
1306 let response = client.packet_capture(proto_request).await?;
1307 let mut stream = response.into_inner();
1308
1309 let mut data = Vec::new();
1310 let mut node = None;
1311
1312 while let Some(chunk) = stream.next().await {
1313 let chunk = chunk?;
1314 if node.is_none() {
1315 if let Some(metadata) = &chunk.metadata {
1316 node = Some(metadata.hostname.clone());
1317 }
1318 }
1319 data.extend(chunk.bytes);
1320 }
1321
1322 Ok(PacketCaptureResponse::new(data, node))
1323 }
1324
1325 pub async fn netstat(&self, request: NetstatRequest) -> Result<NetstatResponse> {
1327 let mut client = MachineServiceClient::new(self.channel.clone());
1328
1329 let proto_request: ProtoNetstatRequest = request.into();
1330 let response = client.netstat(proto_request).await?;
1331 let inner = response.into_inner();
1332
1333 Ok(NetstatResponse::from(inner))
1334 }
1335}
1336
1337#[derive(Debug)]
1339struct NoVerifier;
1340
1341impl rustls::client::danger::ServerCertVerifier for NoVerifier {
1342 fn verify_server_cert(
1343 &self,
1344 _end_entity: &rustls::pki_types::CertificateDer<'_>,
1345 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
1346 _server_name: &rustls::pki_types::ServerName<'_>,
1347 _ocsp_response: &[u8],
1348 _now: rustls::pki_types::UnixTime,
1349 ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1350 Ok(rustls::client::danger::ServerCertVerified::assertion())
1351 }
1352
1353 fn verify_tls12_signature(
1354 &self,
1355 _message: &[u8],
1356 _cert: &rustls::pki_types::CertificateDer<'_>,
1357 _dss: &rustls::DigitallySignedStruct,
1358 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1359 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1360 }
1361
1362 fn verify_tls13_signature(
1363 &self,
1364 _message: &[u8],
1365 _cert: &rustls::pki_types::CertificateDer<'_>,
1366 _dss: &rustls::DigitallySignedStruct,
1367 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1368 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1369 }
1370
1371 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1372 vec![
1373 rustls::SignatureScheme::RSA_PKCS1_SHA1,
1374 rustls::SignatureScheme::ECDSA_SHA1_Legacy,
1375 rustls::SignatureScheme::RSA_PKCS1_SHA256,
1376 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1377 rustls::SignatureScheme::RSA_PKCS1_SHA384,
1378 rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
1379 rustls::SignatureScheme::RSA_PKCS1_SHA512,
1380 rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
1381 rustls::SignatureScheme::RSA_PSS_SHA256,
1382 rustls::SignatureScheme::RSA_PSS_SHA384,
1383 rustls::SignatureScheme::RSA_PSS_SHA512,
1384 rustls::SignatureScheme::ED25519,
1385 rustls::SignatureScheme::ED448,
1386 ]
1387 }
1388}
1389
1390mod pool;
1391
1392pub use pool::{ConnectionPool, ConnectionPoolConfig, EndpointHealth, HealthStatus, LoadBalancer};
1393
1394#[cfg(test)]
1395mod tests;