Skip to main content

umadb_client/
lib.rs

1use async_trait::async_trait;
2use futures::Stream;
3use futures::ready;
4use std::collections::VecDeque;
5use std::fs;
6use std::path::PathBuf;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::task::{Context, Poll};
10use tonic::Request;
11use tonic::metadata::MetadataValue;
12use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint};
13
14use tokio::runtime::{Handle, Runtime};
15use umadb_dcb::{
16    DcbAppendCondition, DcbError, DcbEvent, DcbEventStoreAsync, DcbEventStoreSync, DcbQuery,
17    DcbReadResponseAsync, DcbReadResponseSync, DcbResult, DcbSequencedEvent, DcbSubscriptionAsync,
18    DcbSubscriptionSync, TrackingInfo,
19};
20
21use std::sync::{Once, OnceLock};
22use tokio::sync::watch;
23
24/// A global watch channel for shutdown/cancel signals.
25static CANCEL_SENDER: OnceLock<watch::Sender<()>> = OnceLock::new();
26
27/// Returns a receiver subscribed to the global cancel signal.
28fn cancel_receiver() -> watch::Receiver<()> {
29    let sender = CANCEL_SENDER.get_or_init(|| {
30        let (tx, _rx) = watch::channel::<()>(());
31        tx
32    });
33    sender.subscribe()
34}
35
36/// Sends the cancel signal to all receivers (e.g., on Ctrl-C).
37pub fn trigger_cancel() {
38    if let Some(sender) = CANCEL_SENDER.get() {
39        let _ = sender.send(()); // ignore error if already closed
40    }
41}
42
43static REGISTER_SIGINT: Once = Once::new();
44
45pub fn register_cancel_sigint_handler() {
46    REGISTER_SIGINT.call_once(|| {
47        // Capture the current runtime handle; panic if none exists
48        let handle = Handle::current();
49
50        // Spawn a detached task on that runtime
51        handle.spawn(async {
52            if tokio::signal::ctrl_c().await.is_ok() {
53                trigger_cancel();
54            }
55        });
56    });
57}
58
59pub struct UmaDbClient {
60    url: String,
61    ca_path: Option<String>,
62    batch_size: Option<u32>,
63    without_sigint_handler: bool,
64    api_key: Option<String>,
65}
66
67impl UmaDbClient {
68    pub fn new(url: String) -> Self {
69        Self {
70            url,
71            ca_path: None,
72            batch_size: None,
73            without_sigint_handler: false,
74            api_key: None,
75        }
76    }
77
78    pub fn ca_path(self, ca_path: String) -> Self {
79        Self {
80            ca_path: Some(ca_path),
81            ..self
82        }
83    }
84
85    pub fn api_key(self, api_key: String) -> Self {
86        Self {
87            api_key: Some(api_key),
88            ..self
89        }
90    }
91
92    pub fn batch_size(self, batch_size: u32) -> Self {
93        Self {
94            batch_size: Some(batch_size),
95            ..self
96        }
97    }
98
99    pub fn without_sigint_handler(self) -> Self {
100        Self {
101            without_sigint_handler: true,
102            ..self
103        }
104    }
105
106    pub fn connect(&self) -> DcbResult<SyncUmaDbClient> {
107        let client = SyncUmaDbClient::connect(
108            self.url.clone(),
109            self.ca_path.clone(),
110            self.batch_size,
111            self.api_key.clone(),
112        );
113        if !self.without_sigint_handler
114            && let Ok(client) = &client
115        {
116            client.register_cancel_sigint_handler();
117        }
118        client
119    }
120    pub async fn connect_async(&self) -> DcbResult<AsyncUmaDbClient> {
121        let client = AsyncUmaDbClient::connect(
122            self.url.clone(),
123            self.ca_path.clone(),
124            self.batch_size,
125            self.api_key.clone(),
126        )
127        .await;
128        if !self.without_sigint_handler
129            && let Ok(client) = &client
130        {
131            client.register_cancel_sigint_handler().await;
132        }
133        client
134    }
135}
136
137// --- Sync wrapper around the async client ---
138pub struct SyncUmaDbClient {
139    async_client: AsyncUmaDbClient,
140    handle: Handle,
141    _runtime: Option<Runtime>, // Keeps runtime alive if we created it
142}
143
144impl SyncUmaDbClient {
145    /// Subscribe to events starting from an optional position.
146    /// This is a convenience wrapper around the async client's Subscribe RPC.
147    /// The returned iterator yields events indefinitely until cancelled or the stream ends.
148    pub fn subscribe(
149        &self,
150        query: Option<DcbQuery>,
151        after: Option<u64>,
152    ) -> DcbResult<Box<dyn DcbSubscriptionSync + Send + 'static>> {
153        let async_subscription = self
154            .handle
155            .block_on(self.async_client.subscribe(query, after))?;
156        Ok(Box::new(SyncClientSubscription {
157            rt: self.handle.clone(),
158            async_resp: async_subscription,
159            buffer: VecDeque::new(),
160            finished: false,
161        }))
162    }
163    pub fn connect(
164        url: String,
165        ca_path: Option<String>,
166        batch_size: Option<u32>,
167        api_key: Option<String>,
168    ) -> DcbResult<Self> {
169        let (rt, handle) = Self::get_rt_handle();
170        let async_client =
171            handle.block_on(AsyncUmaDbClient::connect(url, ca_path, batch_size, api_key))?;
172        Ok(Self {
173            async_client,
174            _runtime: rt, // Keep runtime alive for the client lifetime
175            handle,
176        })
177    }
178
179    pub fn connect_with_tls_options(
180        url: String,
181        tls_options: Option<ClientTlsOptions>,
182        batch_size: Option<u32>,
183    ) -> DcbResult<Self> {
184        let (rt, handle) = Self::get_rt_handle();
185        let async_client = handle.block_on(AsyncUmaDbClient::connect_with_tls_options(
186            url,
187            tls_options,
188            batch_size,
189            None,
190        ))?;
191        Ok(Self {
192            async_client,
193            _runtime: rt, // Keep runtime alive for the client lifetime
194            handle,
195        })
196    }
197
198    fn get_rt_handle() -> (Option<Runtime>, Handle) {
199        let (rt, handle) = {
200            // Try to use an existing runtime first
201            if let Ok(handle) = Handle::try_current() {
202                (None, handle)
203            } else {
204                // No runtime → create and own one
205                let rt = Runtime::new().expect("failed to create Tokio runtime");
206                let handle = rt.handle().clone();
207                (Some(rt), handle)
208            }
209        };
210        (rt, handle)
211    }
212
213    pub fn register_cancel_sigint_handler(&self) {
214        self.handle
215            .block_on(self.async_client.register_cancel_sigint_handler());
216    }
217}
218
219impl DcbEventStoreSync for SyncUmaDbClient {
220    fn read(
221        &self,
222        query: Option<DcbQuery>,
223        start: Option<u64>,
224        backwards: bool,
225        limit: Option<u32>,
226        subscribe: bool, // Deprecated - remove in v1.0.
227    ) -> DcbResult<Box<dyn DcbReadResponseSync + Send + 'static>> {
228        let async_read_response = self.handle.block_on(
229            self.async_client
230                .read(query, start, backwards, limit, subscribe),
231        )?;
232        Ok(Box::new(SyncClientReadResponse {
233            rt: self.handle.clone(),
234            async_resp: async_read_response,
235            buffer: VecDeque::new(),
236            finished: false,
237        }))
238    }
239
240    fn head(&self) -> DcbResult<Option<u64>> {
241        self.handle.block_on(self.async_client.head())
242    }
243
244    fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>> {
245        self.handle
246            .block_on(self.async_client.get_tracking_info(source))
247    }
248
249    fn append(
250        &self,
251        events: Vec<DcbEvent>,
252        condition: Option<DcbAppendCondition>,
253        tracking_info: Option<TrackingInfo>,
254    ) -> DcbResult<u64> {
255        self.handle
256            .block_on(self.async_client.append(events, condition, tracking_info))
257    }
258}
259
260pub struct SyncClientReadResponse {
261    rt: Handle,
262    async_resp: Box<dyn DcbReadResponseAsync + Send + 'static>,
263    buffer: VecDeque<DcbSequencedEvent>, // efficient pop_front()
264    finished: bool,
265}
266
267impl SyncClientReadResponse {
268    /// Fetch the next batch from the async response, filling the buffer
269    fn fetch_next_batch(&mut self) -> DcbResult<()> {
270        if self.finished {
271            return Ok(());
272        }
273
274        let batch = self.rt.block_on(self.async_resp.next_batch())?;
275        if batch.is_empty() {
276            self.finished = true;
277        } else {
278            self.buffer = batch.into();
279        }
280        Ok(())
281    }
282}
283
284impl Iterator for SyncClientReadResponse {
285    type Item = DcbResult<DcbSequencedEvent>;
286
287    fn next(&mut self) -> Option<Self::Item> {
288        // Fetch the next batch if the buffer is empty.
289        while self.buffer.is_empty() && !self.finished {
290            if let Err(e) = self.fetch_next_batch() {
291                return Some(Err(e));
292            }
293        }
294
295        self.buffer.pop_front().map(Ok)
296    }
297}
298
299impl DcbReadResponseSync for SyncClientReadResponse {
300    fn head(&mut self) -> DcbResult<Option<u64>> {
301        self.rt.block_on(self.async_resp.head())
302    }
303
304    fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
305        let mut out = Vec::new();
306        for result in self.by_ref() {
307            out.push(result?);
308        }
309        Ok((out, self.head()?))
310    }
311
312    fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
313        // If there are remaining events in the buffer, drain them
314        if !self.buffer.is_empty() {
315            return Ok(self.buffer.drain(..).collect());
316        }
317
318        // Otherwise fetch a new batch
319        self.fetch_next_batch()?;
320        Ok(self.buffer.drain(..).collect())
321    }
322}
323
324pub struct SyncClientSubscription {
325    rt: Handle,
326    async_resp: Box<dyn DcbSubscriptionAsync + Send + 'static>,
327    buffer: VecDeque<DcbSequencedEvent>, // efficient pop_front()
328    finished: bool,
329}
330
331impl SyncClientSubscription {
332    /// Fetch the next batch from the async response, filling the buffer
333    fn fetch_next_batch(&mut self) -> DcbResult<()> {
334        if self.finished {
335            return Ok(());
336        }
337
338        let batch = self.rt.block_on(self.async_resp.next_batch())?;
339        if batch.is_empty() {
340            self.finished = true;
341        } else {
342            self.buffer = batch.into();
343        }
344        Ok(())
345    }
346}
347
348impl Iterator for SyncClientSubscription {
349    type Item = DcbResult<DcbSequencedEvent>;
350
351    fn next(&mut self) -> Option<Self::Item> {
352        // Fetch the next batch if the buffer is empty.
353        while self.buffer.is_empty() && !self.finished {
354            if let Err(e) = self.fetch_next_batch() {
355                return Some(Err(e));
356            }
357        }
358
359        self.buffer.pop_front().map(Ok)
360    }
361}
362
363impl DcbSubscriptionSync for SyncClientSubscription {
364    fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
365        // If there are remaining events in the buffer, drain them
366        if !self.buffer.is_empty() {
367            return Ok(self.buffer.drain(..).collect());
368        }
369
370        // Otherwise fetch a new batch
371        self.fetch_next_batch()?;
372        Ok(self.buffer.drain(..).collect())
373    }
374}
375
376// Async client implementation
377pub struct AsyncUmaDbClient {
378    client: umadb_proto::v1::dcb_client::DcbClient<Channel>,
379    batch_size: Option<u32>,
380    tls_enabled: bool,
381    api_key: Option<String>,
382}
383
384impl AsyncUmaDbClient {
385    pub async fn subscribe(
386        &self,
387        query: Option<DcbQuery>,
388        after: Option<u64>,
389    ) -> DcbResult<Box<dyn DcbSubscriptionAsync + Send + 'static>> {
390        let query_proto = query.map(|q| q.into());
391        let mut client = self.client.clone();
392        let req_body = umadb_proto::v1::SubscribeRequest {
393            query: query_proto,
394            after,
395            batch_size: self.batch_size,
396        };
397        let req = self.add_auth(Request::new(req_body))?;
398        let response = client
399            .subscribe(req)
400            .await
401            .map_err(umadb_proto::dcb_error_from_status)?;
402        let stream = response.into_inner();
403        Ok(Box::new(AsyncClientSubscribeResponse::new(stream)))
404    }
405    pub async fn connect(
406        url: String,
407        ca_path: Option<String>,
408        batch_size: Option<u32>,
409        api_key: Option<String>,
410    ) -> DcbResult<Self> {
411        // Try to read the CA certificate.
412        let ca_pem = {
413            if let Some(ca_path) = ca_path {
414                let ca_path = PathBuf::from(ca_path);
415                Some(
416                    fs::read(&ca_path)
417                        .unwrap_or_else(|_| panic!("couldn't read cert_path: {:?}", ca_path)),
418                )
419            } else {
420                None
421            }
422        };
423
424        let client_tls_options = Some(ClientTlsOptions {
425            domain: None,
426            ca_pem,
427        });
428
429        Self::connect_with_tls_options(url, client_tls_options, batch_size, api_key).await
430    }
431
432    pub async fn connect_with_tls_options(
433        url: String,
434        tls_options: Option<ClientTlsOptions>,
435        batch_size: Option<u32>,
436        api_key: Option<String>,
437    ) -> DcbResult<Self> {
438        let tls_enabled = url.starts_with("https://") || url.starts_with("grpcs://");
439        match new_channel(url, tls_options).await {
440            Ok(channel) => Ok(Self {
441                client: umadb_proto::v1::dcb_client::DcbClient::new(channel),
442                batch_size,
443                tls_enabled,
444                api_key,
445            }),
446            Err(err) => Err(DcbError::TransportError(format!("{err}"))),
447        }
448    }
449
450    fn add_auth<T>(&self, mut req: Request<T>) -> DcbResult<Request<T>> {
451        if let Some(key) = &self.api_key {
452            if !self.tls_enabled {
453                return Err(DcbError::TransportError(
454                    "API key configured but TLS is not enabled; refusing to send credentials over insecure channel".to_string(),
455                ));
456            }
457            let token = MetadataValue::from_str(&format!("Bearer {}", key))
458                .map_err(|e| DcbError::TransportError(format!("invalid API key: {}", e)))?;
459            req.metadata_mut().insert("authorization", token);
460        }
461        Ok(req)
462    }
463
464    pub async fn register_cancel_sigint_handler(&self) {
465        register_cancel_sigint_handler();
466    }
467}
468
469#[async_trait]
470impl DcbEventStoreAsync for AsyncUmaDbClient {
471    // Async inherent methods: use the gRPC client directly (no trait required)
472    async fn read<'a>(
473        &'a self,
474        query: Option<DcbQuery>,
475        start: Option<u64>,
476        backwards: bool,
477        limit: Option<u32>,
478        subscribe: bool,
479    ) -> DcbResult<Box<dyn DcbReadResponseAsync + Send + 'static>> {
480        let query_proto = query.map(|q| q.into());
481        let req_body = umadb_proto::v1::ReadRequest {
482            query: query_proto,
483            start,
484            backwards: Some(backwards),
485            limit,
486            subscribe: Some(subscribe),
487            batch_size: self.batch_size,
488        };
489        let mut client = self.client.clone();
490        let req = self.add_auth(Request::new(req_body))?;
491        let response = client
492            .read(req)
493            .await
494            .map_err(umadb_proto::dcb_error_from_status)?;
495        let stream = response.into_inner();
496        Ok(Box::new(AsyncClientReadResponse::new(stream)))
497    }
498
499    async fn head(&self) -> DcbResult<Option<u64>> {
500        let mut client = self.client.clone();
501        let req = self.add_auth(Request::new(umadb_proto::v1::HeadRequest {}))?;
502        match client.head(req).await {
503            Ok(response) => Ok(response.into_inner().position),
504            Err(status) => Err(umadb_proto::dcb_error_from_status(status)),
505        }
506    }
507
508    async fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>> {
509        let mut client = self.client.clone();
510        let req = self.add_auth(Request::new(umadb_proto::v1::TrackingRequest {
511            source: source.to_string(),
512        }))?;
513        match client.get_tracking_info(req).await {
514            Ok(response) => Ok(response.into_inner().position),
515            Err(status) => Err(umadb_proto::dcb_error_from_status(status)),
516        }
517    }
518
519    async fn append(
520        &self,
521        events: Vec<DcbEvent>,
522        condition: Option<DcbAppendCondition>,
523        tracking_info: Option<TrackingInfo>,
524    ) -> DcbResult<u64> {
525        let events_proto: Vec<umadb_proto::v1::Event> = events
526            .into_iter()
527            .map(umadb_proto::v1::Event::from)
528            .collect();
529        let condition_proto = condition.map(|c| umadb_proto::v1::AppendCondition {
530            fail_if_events_match: Some(c.fail_if_events_match.into()),
531            after: c.after,
532        });
533        let tracking_info_proto = tracking_info.map(|t| umadb_proto::v1::TrackingInfo {
534            source: t.source,
535            position: t.position,
536        });
537        let body = umadb_proto::v1::AppendRequest {
538            events: events_proto,
539            condition: condition_proto,
540            tracking_info: tracking_info_proto,
541        };
542        let mut client = self.client.clone();
543        let req = self.add_auth(Request::new(body))?;
544        match client.append(req).await {
545            Ok(response) => Ok(response.into_inner().position),
546            Err(status) => Err(umadb_proto::dcb_error_from_status(status)),
547        }
548    }
549}
550
551/// Async read response wrapper that provides batched access and head metadata
552pub struct AsyncClientReadResponse {
553    stream: tonic::Streaming<umadb_proto::v1::ReadResponse>,
554    buffered: VecDeque<DcbSequencedEvent>,
555    last_head: Option<Option<u64>>, // None = unknown yet; Some(x) = known
556    ended: bool,
557    cancel: watch::Receiver<()>,
558}
559
560impl AsyncClientReadResponse {
561    pub fn new(stream: tonic::Streaming<umadb_proto::v1::ReadResponse>) -> Self {
562        Self {
563            stream,
564            buffered: VecDeque::new(),
565            last_head: None,
566            ended: false,
567            cancel: cancel_receiver(),
568        }
569    }
570
571    /// Fetches the next batch if needed, filling the buffer
572    async fn fetch_next_if_needed(&mut self) -> DcbResult<()> {
573        if !self.buffered.is_empty() || self.ended {
574            return Ok(());
575        }
576
577        tokio::select! {
578            _ = self.cancel.changed() => {
579                self.ended = true;
580                // return Ok(());
581                return Err(DcbError::CancelledByUser());
582            }
583            msg = self.stream.message() => {
584                match msg {
585                    Ok(Some(resp)) => {
586                        self.last_head = Some(resp.head);
587                        let mut buffered = VecDeque::with_capacity(resp.events.len());
588                        for e in resp.events {
589                            if let Some(ev) = e.event {
590                                let event = DcbEvent::try_from(ev)?;
591                                buffered.push_back(DcbSequencedEvent { position: e.position, event });
592                            }
593                        }
594                        self.buffered = buffered;
595                    }
596                    Ok(None) => self.ended = true,
597                    Err(status) => return Err(umadb_proto::dcb_error_from_status(status)),
598                }
599            }
600        }
601
602        Ok(())
603    }
604}
605
606#[async_trait]
607impl DcbReadResponseAsync for AsyncClientReadResponse {
608    async fn head(&mut self) -> DcbResult<Option<u64>> {
609        if let Some(h) = self.last_head {
610            return Ok(h);
611        }
612        // Need to read at least one message to learn head
613        self.fetch_next_if_needed().await?;
614        Ok(self.last_head.unwrap_or(None))
615    }
616
617    async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
618        if !self.buffered.is_empty() {
619            return Ok(self.buffered.drain(..).collect());
620        }
621
622        self.fetch_next_if_needed().await?;
623
624        if !self.buffered.is_empty() {
625            return Ok(self.buffered.drain(..).collect());
626        }
627
628        Ok(Vec::new())
629    }
630}
631
632impl Stream for AsyncClientReadResponse {
633    type Item = DcbResult<DcbSequencedEvent>;
634
635    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
636        let this = self.get_mut();
637
638        loop {
639            // Return buffered event if available
640            if let Some(ev) = this.buffered.pop_front() {
641                return Poll::Ready(Some(Ok(ev)));
642            }
643
644            // Stop if the stream ended.
645            if this.ended {
646                return Poll::Ready(None);
647            }
648
649            // Poll the underlying tonic::Streaming
650            return match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
651                Some(Ok(resp)) => {
652                    this.last_head = Some(resp.head);
653
654                    let mut buffered = VecDeque::with_capacity(resp.events.len());
655                    for e in resp.events {
656                        if let Some(ev) = e.event {
657                            // Propagate any conversion error using DCBResult.
658                            let event = match DcbEvent::try_from(ev) {
659                                Ok(event) => event,
660                                Err(err) => return Poll::Ready(Some(Err(err))),
661                            };
662                            buffered.push_back(DcbSequencedEvent {
663                                position: e.position,
664                                event,
665                            });
666                        }
667                    }
668
669                    this.buffered = buffered;
670
671                    // If the batch is empty, loop again to poll the next message
672                    if this.buffered.is_empty() {
673                        continue;
674                    }
675
676                    // Otherwise, return the first event
677                    let ev = this.buffered.pop_front().unwrap();
678                    Poll::Ready(Some(Ok(ev)))
679                }
680                Some(Err(status)) => {
681                    this.ended = true;
682                    Poll::Ready(Some(Err(umadb_proto::dcb_error_from_status(status))))
683                }
684                None => {
685                    this.ended = true;
686                    Poll::Ready(None)
687                }
688            };
689        }
690    }
691}
692
693// Async subscribe response wrapper: similar to AsyncClientReadResponse but without head
694pub struct AsyncClientSubscribeResponse {
695    stream: tonic::Streaming<umadb_proto::v1::SubscribeResponse>,
696    buffered: VecDeque<DcbSequencedEvent>,
697    ended: bool,
698    cancel: watch::Receiver<()>,
699}
700
701impl AsyncClientSubscribeResponse {
702    pub fn new(stream: tonic::Streaming<umadb_proto::v1::SubscribeResponse>) -> Self {
703        Self {
704            stream,
705            buffered: VecDeque::new(),
706            ended: false,
707            cancel: cancel_receiver(),
708        }
709    }
710
711    async fn fetch_next_if_needed(&mut self) -> DcbResult<()> {
712        if !self.buffered.is_empty() || self.ended {
713            return Ok(());
714        }
715
716        tokio::select! {
717            _ = self.cancel.changed() => {
718                self.ended = true;
719                return Err(DcbError::CancelledByUser());
720            }
721            msg = self.stream.message() => {
722                match msg {
723                    Ok(Some(resp)) => {
724                        let mut buffered = VecDeque::with_capacity(resp.events.len());
725                        for e in resp.events {
726                            if let Some(ev) = e.event {
727                                let event = DcbEvent::try_from(ev)?;
728                                buffered.push_back(DcbSequencedEvent { position: e.position, event });
729                            }
730                        }
731                        self.buffered = buffered;
732                    }
733                    Ok(None) => self.ended = true,
734                    Err(status) => return Err(umadb_proto::dcb_error_from_status(status)),
735                }
736            }
737        }
738        Ok(())
739    }
740}
741
742#[async_trait]
743impl DcbSubscriptionAsync for AsyncClientSubscribeResponse {
744    async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
745        if !self.buffered.is_empty() {
746            return Ok(self.buffered.drain(..).collect());
747        }
748        self.fetch_next_if_needed().await?;
749        if !self.buffered.is_empty() {
750            return Ok(self.buffered.drain(..).collect());
751        }
752        Ok(Vec::new())
753    }
754}
755
756impl Stream for AsyncClientSubscribeResponse {
757    type Item = DcbResult<DcbSequencedEvent>;
758
759    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
760        let this = self.get_mut();
761
762        loop {
763            if let Some(ev) = this.buffered.pop_front() {
764                return Poll::Ready(Some(Ok(ev)));
765            }
766            if this.ended {
767                return Poll::Ready(None);
768            }
769
770            return match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
771                Some(Ok(resp)) => {
772                    let mut buffered = VecDeque::with_capacity(resp.events.len());
773                    for e in resp.events {
774                        if let Some(ev) = e.event {
775                            let event = match DcbEvent::try_from(ev) {
776                                Ok(event) => event,
777                                Err(err) => return Poll::Ready(Some(Err(err))),
778                            };
779                            buffered.push_back(DcbSequencedEvent {
780                                position: e.position,
781                                event,
782                            });
783                        }
784                    }
785                    this.buffered = buffered;
786                    if this.buffered.is_empty() {
787                        continue;
788                    }
789                    let ev = this.buffered.pop_front().unwrap();
790                    Poll::Ready(Some(Ok(ev)))
791                }
792                Some(Err(status)) => {
793                    this.ended = true;
794                    Poll::Ready(Some(Err(umadb_proto::dcb_error_from_status(status))))
795                }
796                None => {
797                    this.ended = true;
798                    Poll::Ready(None)
799                }
800            };
801        }
802    }
803}
804
805#[derive(Clone, Debug, Default)]
806pub struct ClientTlsOptions {
807    pub domain: Option<String>,
808    pub ca_pem: Option<Vec<u8>>, // trusted CA cert in PEM for self-signed setups
809}
810
811async fn new_channel(
812    url: String,
813    tls: Option<ClientTlsOptions>,
814) -> Result<Channel, tonic::transport::Error> {
815    new_endpoint(url, tls)?.connect().await
816}
817
818fn new_endpoint(
819    url: String,
820    tls: Option<ClientTlsOptions>,
821) -> Result<Endpoint, tonic::transport::Error> {
822    use std::time::Duration;
823
824    // Accept grpcs:// as an alias for https://
825    let mut url_owned = url.to_string();
826    if url_owned.starts_with("grpcs://") {
827        url_owned = url_owned.replacen("grpcs://", "https://", 1);
828    }
829
830    let mut endpoint = Endpoint::from_shared(url_owned)?
831        .tcp_nodelay(true)
832        .http2_keep_alive_interval(Duration::from_secs(5))
833        .keep_alive_timeout(Duration::from_secs(10))
834        .initial_stream_window_size(Some(4 * 1024 * 1024))
835        .initial_connection_window_size(Some(8 * 1024 * 1024));
836
837    if let Some(opts) = tls {
838        let mut cfg = ClientTlsConfig::new();
839        if let Some(domain) = &opts.domain {
840            cfg = cfg.domain_name(domain.clone());
841        }
842        if let Some(ca) = opts.ca_pem {
843            cfg = cfg.ca_certificate(Certificate::from_pem(ca));
844        }
845        endpoint = endpoint.tls_config(cfg)?;
846    } else if url.starts_with("https://") {
847        // When using https without explicit options, still enable default TLS.
848        endpoint = endpoint.tls_config(ClientTlsConfig::new())?;
849    }
850
851    Ok(endpoint)
852}