1use async_trait::async_trait;
2use futures::Stream;
3use futures::ready;
4use std::collections::VecDeque;
5use std::fs;
6use std::path::PathBuf;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::task::{Context, Poll};
10use tonic::Request;
11use tonic::metadata::MetadataValue;
12use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint};
13
14use tokio::runtime::{Handle, Runtime};
15use umadb_dcb::{
16 DcbAppendCondition, DcbError, DcbEvent, DcbEventStoreAsync, DcbEventStoreSync, DcbQuery,
17 DcbReadResponseAsync, DcbReadResponseSync, DcbResult, DcbSequencedEvent, DcbSubscriptionAsync,
18 DcbSubscriptionSync, TrackingInfo,
19};
20
21use std::sync::{Once, OnceLock};
22use tokio::sync::watch;
23
24static CANCEL_SENDER: OnceLock<watch::Sender<()>> = OnceLock::new();
26
27fn cancel_receiver() -> watch::Receiver<()> {
29 let sender = CANCEL_SENDER.get_or_init(|| {
30 let (tx, _rx) = watch::channel::<()>(());
31 tx
32 });
33 sender.subscribe()
34}
35
36pub fn trigger_cancel() {
38 if let Some(sender) = CANCEL_SENDER.get() {
39 let _ = sender.send(()); }
41}
42
43static REGISTER_SIGINT: Once = Once::new();
44
45pub fn register_cancel_sigint_handler() {
46 REGISTER_SIGINT.call_once(|| {
47 let handle = Handle::current();
49
50 handle.spawn(async {
52 if tokio::signal::ctrl_c().await.is_ok() {
53 trigger_cancel();
54 }
55 });
56 });
57}
58
59pub struct UmaDbClient {
60 url: String,
61 ca_path: Option<String>,
62 batch_size: Option<u32>,
63 without_sigint_handler: bool,
64 api_key: Option<String>,
65}
66
67impl UmaDbClient {
68 pub fn new(url: String) -> Self {
69 Self {
70 url,
71 ca_path: None,
72 batch_size: None,
73 without_sigint_handler: false,
74 api_key: None,
75 }
76 }
77
78 pub fn ca_path(self, ca_path: String) -> Self {
79 Self {
80 ca_path: Some(ca_path),
81 ..self
82 }
83 }
84
85 pub fn api_key(self, api_key: String) -> Self {
86 Self {
87 api_key: Some(api_key),
88 ..self
89 }
90 }
91
92 pub fn batch_size(self, batch_size: u32) -> Self {
93 Self {
94 batch_size: Some(batch_size),
95 ..self
96 }
97 }
98
99 pub fn without_sigint_handler(self) -> Self {
100 Self {
101 without_sigint_handler: true,
102 ..self
103 }
104 }
105
106 pub fn connect(&self) -> DcbResult<SyncUmaDbClient> {
107 let client = SyncUmaDbClient::connect(
108 self.url.clone(),
109 self.ca_path.clone(),
110 self.batch_size,
111 self.api_key.clone(),
112 );
113 if !self.without_sigint_handler
114 && let Ok(client) = &client
115 {
116 client.register_cancel_sigint_handler();
117 }
118 client
119 }
120 pub async fn connect_async(&self) -> DcbResult<AsyncUmaDbClient> {
121 let client = AsyncUmaDbClient::connect(
122 self.url.clone(),
123 self.ca_path.clone(),
124 self.batch_size,
125 self.api_key.clone(),
126 )
127 .await;
128 if !self.without_sigint_handler
129 && let Ok(client) = &client
130 {
131 client.register_cancel_sigint_handler().await;
132 }
133 client
134 }
135}
136
137pub struct SyncUmaDbClient {
139 async_client: AsyncUmaDbClient,
140 handle: Handle,
141 _runtime: Option<Runtime>, }
143
144impl SyncUmaDbClient {
145 pub fn subscribe(
149 &self,
150 query: Option<DcbQuery>,
151 after: Option<u64>,
152 ) -> DcbResult<Box<dyn DcbSubscriptionSync + Send + 'static>> {
153 let async_subscription = self
154 .handle
155 .block_on(self.async_client.subscribe(query, after))?;
156 Ok(Box::new(SyncClientSubscription {
157 rt: self.handle.clone(),
158 async_resp: async_subscription,
159 buffer: VecDeque::new(),
160 finished: false,
161 }))
162 }
163 pub fn connect(
164 url: String,
165 ca_path: Option<String>,
166 batch_size: Option<u32>,
167 api_key: Option<String>,
168 ) -> DcbResult<Self> {
169 let (rt, handle) = Self::get_rt_handle();
170 let async_client =
171 handle.block_on(AsyncUmaDbClient::connect(url, ca_path, batch_size, api_key))?;
172 Ok(Self {
173 async_client,
174 _runtime: rt, handle,
176 })
177 }
178
179 pub fn connect_with_tls_options(
180 url: String,
181 tls_options: Option<ClientTlsOptions>,
182 batch_size: Option<u32>,
183 ) -> DcbResult<Self> {
184 let (rt, handle) = Self::get_rt_handle();
185 let async_client = handle.block_on(AsyncUmaDbClient::connect_with_tls_options(
186 url,
187 tls_options,
188 batch_size,
189 None,
190 ))?;
191 Ok(Self {
192 async_client,
193 _runtime: rt, handle,
195 })
196 }
197
198 fn get_rt_handle() -> (Option<Runtime>, Handle) {
199 let (rt, handle) = {
200 if let Ok(handle) = Handle::try_current() {
202 (None, handle)
203 } else {
204 let rt = Runtime::new().expect("failed to create Tokio runtime");
206 let handle = rt.handle().clone();
207 (Some(rt), handle)
208 }
209 };
210 (rt, handle)
211 }
212
213 pub fn register_cancel_sigint_handler(&self) {
214 self.handle
215 .block_on(self.async_client.register_cancel_sigint_handler());
216 }
217}
218
219impl DcbEventStoreSync for SyncUmaDbClient {
220 fn read(
221 &self,
222 query: Option<DcbQuery>,
223 start: Option<u64>,
224 backwards: bool,
225 limit: Option<u32>,
226 subscribe: bool, ) -> DcbResult<Box<dyn DcbReadResponseSync + Send + 'static>> {
228 let async_read_response = self.handle.block_on(
229 self.async_client
230 .read(query, start, backwards, limit, subscribe),
231 )?;
232 Ok(Box::new(SyncClientReadResponse {
233 rt: self.handle.clone(),
234 async_resp: async_read_response,
235 buffer: VecDeque::new(),
236 finished: false,
237 }))
238 }
239
240 fn head(&self) -> DcbResult<Option<u64>> {
241 self.handle.block_on(self.async_client.head())
242 }
243
244 fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>> {
245 self.handle
246 .block_on(self.async_client.get_tracking_info(source))
247 }
248
249 fn append(
250 &self,
251 events: Vec<DcbEvent>,
252 condition: Option<DcbAppendCondition>,
253 tracking_info: Option<TrackingInfo>,
254 ) -> DcbResult<u64> {
255 self.handle
256 .block_on(self.async_client.append(events, condition, tracking_info))
257 }
258}
259
260pub struct SyncClientReadResponse {
261 rt: Handle,
262 async_resp: Box<dyn DcbReadResponseAsync + Send + 'static>,
263 buffer: VecDeque<DcbSequencedEvent>, finished: bool,
265}
266
267impl SyncClientReadResponse {
268 fn fetch_next_batch(&mut self) -> DcbResult<()> {
270 if self.finished {
271 return Ok(());
272 }
273
274 let batch = self.rt.block_on(self.async_resp.next_batch())?;
275 if batch.is_empty() {
276 self.finished = true;
277 } else {
278 self.buffer = batch.into();
279 }
280 Ok(())
281 }
282}
283
284impl Iterator for SyncClientReadResponse {
285 type Item = DcbResult<DcbSequencedEvent>;
286
287 fn next(&mut self) -> Option<Self::Item> {
288 while self.buffer.is_empty() && !self.finished {
290 if let Err(e) = self.fetch_next_batch() {
291 return Some(Err(e));
292 }
293 }
294
295 self.buffer.pop_front().map(Ok)
296 }
297}
298
299impl DcbReadResponseSync for SyncClientReadResponse {
300 fn head(&mut self) -> DcbResult<Option<u64>> {
301 self.rt.block_on(self.async_resp.head())
302 }
303
304 fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
305 let mut out = Vec::new();
306 for result in self.by_ref() {
307 out.push(result?);
308 }
309 Ok((out, self.head()?))
310 }
311
312 fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
313 if !self.buffer.is_empty() {
315 return Ok(self.buffer.drain(..).collect());
316 }
317
318 self.fetch_next_batch()?;
320 Ok(self.buffer.drain(..).collect())
321 }
322}
323
324pub struct SyncClientSubscription {
325 rt: Handle,
326 async_resp: Box<dyn DcbSubscriptionAsync + Send + 'static>,
327 buffer: VecDeque<DcbSequencedEvent>, finished: bool,
329}
330
331impl SyncClientSubscription {
332 fn fetch_next_batch(&mut self) -> DcbResult<()> {
334 if self.finished {
335 return Ok(());
336 }
337
338 let batch = self.rt.block_on(self.async_resp.next_batch())?;
339 if batch.is_empty() {
340 self.finished = true;
341 } else {
342 self.buffer = batch.into();
343 }
344 Ok(())
345 }
346}
347
348impl Iterator for SyncClientSubscription {
349 type Item = DcbResult<DcbSequencedEvent>;
350
351 fn next(&mut self) -> Option<Self::Item> {
352 while self.buffer.is_empty() && !self.finished {
354 if let Err(e) = self.fetch_next_batch() {
355 return Some(Err(e));
356 }
357 }
358
359 self.buffer.pop_front().map(Ok)
360 }
361}
362
363impl DcbSubscriptionSync for SyncClientSubscription {
364 fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
365 if !self.buffer.is_empty() {
367 return Ok(self.buffer.drain(..).collect());
368 }
369
370 self.fetch_next_batch()?;
372 Ok(self.buffer.drain(..).collect())
373 }
374}
375
376pub struct AsyncUmaDbClient {
378 client: umadb_proto::v1::dcb_client::DcbClient<Channel>,
379 batch_size: Option<u32>,
380 tls_enabled: bool,
381 api_key: Option<String>,
382}
383
384impl AsyncUmaDbClient {
385 pub async fn subscribe(
386 &self,
387 query: Option<DcbQuery>,
388 after: Option<u64>,
389 ) -> DcbResult<Box<dyn DcbSubscriptionAsync + Send + 'static>> {
390 let query_proto = query.map(|q| q.into());
391 let mut client = self.client.clone();
392 let req_body = umadb_proto::v1::SubscribeRequest {
393 query: query_proto,
394 after,
395 batch_size: self.batch_size,
396 };
397 let req = self.add_auth(Request::new(req_body))?;
398 let response = client
399 .subscribe(req)
400 .await
401 .map_err(umadb_proto::dcb_error_from_status)?;
402 let stream = response.into_inner();
403 Ok(Box::new(AsyncClientSubscribeResponse::new(stream)))
404 }
405 pub async fn connect(
406 url: String,
407 ca_path: Option<String>,
408 batch_size: Option<u32>,
409 api_key: Option<String>,
410 ) -> DcbResult<Self> {
411 let ca_pem = {
413 if let Some(ca_path) = ca_path {
414 let ca_path = PathBuf::from(ca_path);
415 Some(
416 fs::read(&ca_path)
417 .unwrap_or_else(|_| panic!("couldn't read cert_path: {:?}", ca_path)),
418 )
419 } else {
420 None
421 }
422 };
423
424 let client_tls_options = Some(ClientTlsOptions {
425 domain: None,
426 ca_pem,
427 });
428
429 Self::connect_with_tls_options(url, client_tls_options, batch_size, api_key).await
430 }
431
432 pub async fn connect_with_tls_options(
433 url: String,
434 tls_options: Option<ClientTlsOptions>,
435 batch_size: Option<u32>,
436 api_key: Option<String>,
437 ) -> DcbResult<Self> {
438 let tls_enabled = url.starts_with("https://") || url.starts_with("grpcs://");
439 match new_channel(url, tls_options).await {
440 Ok(channel) => Ok(Self {
441 client: umadb_proto::v1::dcb_client::DcbClient::new(channel),
442 batch_size,
443 tls_enabled,
444 api_key,
445 }),
446 Err(err) => Err(DcbError::TransportError(format!("{err}"))),
447 }
448 }
449
450 fn add_auth<T>(&self, mut req: Request<T>) -> DcbResult<Request<T>> {
451 if let Some(key) = &self.api_key {
452 if !self.tls_enabled {
453 return Err(DcbError::TransportError(
454 "API key configured but TLS is not enabled; refusing to send credentials over insecure channel".to_string(),
455 ));
456 }
457 let token = MetadataValue::from_str(&format!("Bearer {}", key))
458 .map_err(|e| DcbError::TransportError(format!("invalid API key: {}", e)))?;
459 req.metadata_mut().insert("authorization", token);
460 }
461 Ok(req)
462 }
463
464 pub async fn register_cancel_sigint_handler(&self) {
465 register_cancel_sigint_handler();
466 }
467}
468
469#[async_trait]
470impl DcbEventStoreAsync for AsyncUmaDbClient {
471 async fn read<'a>(
473 &'a self,
474 query: Option<DcbQuery>,
475 start: Option<u64>,
476 backwards: bool,
477 limit: Option<u32>,
478 subscribe: bool,
479 ) -> DcbResult<Box<dyn DcbReadResponseAsync + Send + 'static>> {
480 let query_proto = query.map(|q| q.into());
481 let req_body = umadb_proto::v1::ReadRequest {
482 query: query_proto,
483 start,
484 backwards: Some(backwards),
485 limit,
486 subscribe: Some(subscribe),
487 batch_size: self.batch_size,
488 };
489 let mut client = self.client.clone();
490 let req = self.add_auth(Request::new(req_body))?;
491 let response = client
492 .read(req)
493 .await
494 .map_err(umadb_proto::dcb_error_from_status)?;
495 let stream = response.into_inner();
496 Ok(Box::new(AsyncClientReadResponse::new(stream)))
497 }
498
499 async fn head(&self) -> DcbResult<Option<u64>> {
500 let mut client = self.client.clone();
501 let req = self.add_auth(Request::new(umadb_proto::v1::HeadRequest {}))?;
502 match client.head(req).await {
503 Ok(response) => Ok(response.into_inner().position),
504 Err(status) => Err(umadb_proto::dcb_error_from_status(status)),
505 }
506 }
507
508 async fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>> {
509 let mut client = self.client.clone();
510 let req = self.add_auth(Request::new(umadb_proto::v1::TrackingRequest {
511 source: source.to_string(),
512 }))?;
513 match client.get_tracking_info(req).await {
514 Ok(response) => Ok(response.into_inner().position),
515 Err(status) => Err(umadb_proto::dcb_error_from_status(status)),
516 }
517 }
518
519 async fn append(
520 &self,
521 events: Vec<DcbEvent>,
522 condition: Option<DcbAppendCondition>,
523 tracking_info: Option<TrackingInfo>,
524 ) -> DcbResult<u64> {
525 let events_proto: Vec<umadb_proto::v1::Event> = events
526 .into_iter()
527 .map(umadb_proto::v1::Event::from)
528 .collect();
529 let condition_proto = condition.map(|c| umadb_proto::v1::AppendCondition {
530 fail_if_events_match: Some(c.fail_if_events_match.into()),
531 after: c.after,
532 });
533 let tracking_info_proto = tracking_info.map(|t| umadb_proto::v1::TrackingInfo {
534 source: t.source,
535 position: t.position,
536 });
537 let body = umadb_proto::v1::AppendRequest {
538 events: events_proto,
539 condition: condition_proto,
540 tracking_info: tracking_info_proto,
541 };
542 let mut client = self.client.clone();
543 let req = self.add_auth(Request::new(body))?;
544 match client.append(req).await {
545 Ok(response) => Ok(response.into_inner().position),
546 Err(status) => Err(umadb_proto::dcb_error_from_status(status)),
547 }
548 }
549}
550
551pub struct AsyncClientReadResponse {
553 stream: tonic::Streaming<umadb_proto::v1::ReadResponse>,
554 buffered: VecDeque<DcbSequencedEvent>,
555 last_head: Option<Option<u64>>, ended: bool,
557 cancel: watch::Receiver<()>,
558}
559
560impl AsyncClientReadResponse {
561 pub fn new(stream: tonic::Streaming<umadb_proto::v1::ReadResponse>) -> Self {
562 Self {
563 stream,
564 buffered: VecDeque::new(),
565 last_head: None,
566 ended: false,
567 cancel: cancel_receiver(),
568 }
569 }
570
571 async fn fetch_next_if_needed(&mut self) -> DcbResult<()> {
573 if !self.buffered.is_empty() || self.ended {
574 return Ok(());
575 }
576
577 tokio::select! {
578 _ = self.cancel.changed() => {
579 self.ended = true;
580 return Err(DcbError::CancelledByUser());
582 }
583 msg = self.stream.message() => {
584 match msg {
585 Ok(Some(resp)) => {
586 self.last_head = Some(resp.head);
587 let mut buffered = VecDeque::with_capacity(resp.events.len());
588 for e in resp.events {
589 if let Some(ev) = e.event {
590 let event = DcbEvent::try_from(ev)?;
591 buffered.push_back(DcbSequencedEvent { position: e.position, event });
592 }
593 }
594 self.buffered = buffered;
595 }
596 Ok(None) => self.ended = true,
597 Err(status) => return Err(umadb_proto::dcb_error_from_status(status)),
598 }
599 }
600 }
601
602 Ok(())
603 }
604}
605
606#[async_trait]
607impl DcbReadResponseAsync for AsyncClientReadResponse {
608 async fn head(&mut self) -> DcbResult<Option<u64>> {
609 if let Some(h) = self.last_head {
610 return Ok(h);
611 }
612 self.fetch_next_if_needed().await?;
614 Ok(self.last_head.unwrap_or(None))
615 }
616
617 async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
618 if !self.buffered.is_empty() {
619 return Ok(self.buffered.drain(..).collect());
620 }
621
622 self.fetch_next_if_needed().await?;
623
624 if !self.buffered.is_empty() {
625 return Ok(self.buffered.drain(..).collect());
626 }
627
628 Ok(Vec::new())
629 }
630}
631
632impl Stream for AsyncClientReadResponse {
633 type Item = DcbResult<DcbSequencedEvent>;
634
635 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
636 let this = self.get_mut();
637
638 loop {
639 if let Some(ev) = this.buffered.pop_front() {
641 return Poll::Ready(Some(Ok(ev)));
642 }
643
644 if this.ended {
646 return Poll::Ready(None);
647 }
648
649 return match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
651 Some(Ok(resp)) => {
652 this.last_head = Some(resp.head);
653
654 let mut buffered = VecDeque::with_capacity(resp.events.len());
655 for e in resp.events {
656 if let Some(ev) = e.event {
657 let event = match DcbEvent::try_from(ev) {
659 Ok(event) => event,
660 Err(err) => return Poll::Ready(Some(Err(err))),
661 };
662 buffered.push_back(DcbSequencedEvent {
663 position: e.position,
664 event,
665 });
666 }
667 }
668
669 this.buffered = buffered;
670
671 if this.buffered.is_empty() {
673 continue;
674 }
675
676 let ev = this.buffered.pop_front().unwrap();
678 Poll::Ready(Some(Ok(ev)))
679 }
680 Some(Err(status)) => {
681 this.ended = true;
682 Poll::Ready(Some(Err(umadb_proto::dcb_error_from_status(status))))
683 }
684 None => {
685 this.ended = true;
686 Poll::Ready(None)
687 }
688 };
689 }
690 }
691}
692
693pub struct AsyncClientSubscribeResponse {
695 stream: tonic::Streaming<umadb_proto::v1::SubscribeResponse>,
696 buffered: VecDeque<DcbSequencedEvent>,
697 ended: bool,
698 cancel: watch::Receiver<()>,
699}
700
701impl AsyncClientSubscribeResponse {
702 pub fn new(stream: tonic::Streaming<umadb_proto::v1::SubscribeResponse>) -> Self {
703 Self {
704 stream,
705 buffered: VecDeque::new(),
706 ended: false,
707 cancel: cancel_receiver(),
708 }
709 }
710
711 async fn fetch_next_if_needed(&mut self) -> DcbResult<()> {
712 if !self.buffered.is_empty() || self.ended {
713 return Ok(());
714 }
715
716 tokio::select! {
717 _ = self.cancel.changed() => {
718 self.ended = true;
719 return Err(DcbError::CancelledByUser());
720 }
721 msg = self.stream.message() => {
722 match msg {
723 Ok(Some(resp)) => {
724 let mut buffered = VecDeque::with_capacity(resp.events.len());
725 for e in resp.events {
726 if let Some(ev) = e.event {
727 let event = DcbEvent::try_from(ev)?;
728 buffered.push_back(DcbSequencedEvent { position: e.position, event });
729 }
730 }
731 self.buffered = buffered;
732 }
733 Ok(None) => self.ended = true,
734 Err(status) => return Err(umadb_proto::dcb_error_from_status(status)),
735 }
736 }
737 }
738 Ok(())
739 }
740}
741
742#[async_trait]
743impl DcbSubscriptionAsync for AsyncClientSubscribeResponse {
744 async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
745 if !self.buffered.is_empty() {
746 return Ok(self.buffered.drain(..).collect());
747 }
748 self.fetch_next_if_needed().await?;
749 if !self.buffered.is_empty() {
750 return Ok(self.buffered.drain(..).collect());
751 }
752 Ok(Vec::new())
753 }
754}
755
756impl Stream for AsyncClientSubscribeResponse {
757 type Item = DcbResult<DcbSequencedEvent>;
758
759 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
760 let this = self.get_mut();
761
762 loop {
763 if let Some(ev) = this.buffered.pop_front() {
764 return Poll::Ready(Some(Ok(ev)));
765 }
766 if this.ended {
767 return Poll::Ready(None);
768 }
769
770 return match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
771 Some(Ok(resp)) => {
772 let mut buffered = VecDeque::with_capacity(resp.events.len());
773 for e in resp.events {
774 if let Some(ev) = e.event {
775 let event = match DcbEvent::try_from(ev) {
776 Ok(event) => event,
777 Err(err) => return Poll::Ready(Some(Err(err))),
778 };
779 buffered.push_back(DcbSequencedEvent {
780 position: e.position,
781 event,
782 });
783 }
784 }
785 this.buffered = buffered;
786 if this.buffered.is_empty() {
787 continue;
788 }
789 let ev = this.buffered.pop_front().unwrap();
790 Poll::Ready(Some(Ok(ev)))
791 }
792 Some(Err(status)) => {
793 this.ended = true;
794 Poll::Ready(Some(Err(umadb_proto::dcb_error_from_status(status))))
795 }
796 None => {
797 this.ended = true;
798 Poll::Ready(None)
799 }
800 };
801 }
802 }
803}
804
805#[derive(Clone, Debug, Default)]
806pub struct ClientTlsOptions {
807 pub domain: Option<String>,
808 pub ca_pem: Option<Vec<u8>>, }
810
811async fn new_channel(
812 url: String,
813 tls: Option<ClientTlsOptions>,
814) -> Result<Channel, tonic::transport::Error> {
815 new_endpoint(url, tls)?.connect().await
816}
817
818fn new_endpoint(
819 url: String,
820 tls: Option<ClientTlsOptions>,
821) -> Result<Endpoint, tonic::transport::Error> {
822 use std::time::Duration;
823
824 let mut url_owned = url.to_string();
826 if url_owned.starts_with("grpcs://") {
827 url_owned = url_owned.replacen("grpcs://", "https://", 1);
828 }
829
830 let mut endpoint = Endpoint::from_shared(url_owned)?
831 .tcp_nodelay(true)
832 .http2_keep_alive_interval(Duration::from_secs(5))
833 .keep_alive_timeout(Duration::from_secs(10))
834 .initial_stream_window_size(Some(4 * 1024 * 1024))
835 .initial_connection_window_size(Some(8 * 1024 * 1024));
836
837 if let Some(opts) = tls {
838 let mut cfg = ClientTlsConfig::new();
839 if let Some(domain) = &opts.domain {
840 cfg = cfg.domain_name(domain.clone());
841 }
842 if let Some(ca) = opts.ca_pem {
843 cfg = cfg.ca_certificate(Certificate::from_pem(ca));
844 }
845 endpoint = endpoint.tls_config(cfg)?;
846 } else if url.starts_with("https://") {
847 endpoint = endpoint.tls_config(ClientTlsConfig::new())?;
849 }
850
851 Ok(endpoint)
852}