architect_sdk/symbology/
client.rs

1use super::store::SymbologyStore;
2use crate::{
3    grpc::GrpcClientConfig,
4    synced::{Synced, SyncedHandle},
5};
6use anyhow::{bail, Result};
7use architect_api::{
8    grpc::service::symbology_client::SymbologyClient as SymbologyGrpcClient,
9    symbology::protocol::{SubscribeSymbology, SymbologyUpdate},
10    utils::sequence::SequenceIdAndNumber,
11};
12use log::{debug, trace};
13use tonic::{
14    transport::{Channel, Endpoint},
15    Streaming,
16};
17use url::Url;
18
19pub struct SymbologyClient {
20    grpc_endpoint: Endpoint,
21    grpc_config: GrpcClientConfig,
22    grpc: SymbologyGrpcClient<Channel>,
23    upstream_seqno: Option<SequenceIdAndNumber>,
24    updates: Streaming<SymbologyUpdate>,
25    pub store: SymbologyStore,
26    pub ready: Synced<bool>,
27}
28
29#[derive(Clone)]
30pub struct SymbologyClientHandle {
31    pub store: SymbologyStore,
32    pub ready: SyncedHandle<bool>,
33}
34
35impl SymbologyClient {
36    pub async fn connect(url: &Url, grpc_config: GrpcClientConfig) -> Result<Self> {
37        let endpoint = Endpoint::try_from(url.to_string())?;
38        debug!("connecting to {}...", endpoint.uri());
39        let channel = grpc_config.connect_to(endpoint.clone()).await?;
40        let mut grpc = SymbologyGrpcClient::new(channel)
41            .max_decoding_message_size(100 * 1024 * 1024)
42            .max_encoding_message_size(100 * 1024 * 1024);
43        debug!("subscribing to symbology updates...");
44        let updates = grpc.subscribe_symbology(SubscribeSymbology {}).await?.into_inner();
45        Ok(Self {
46            grpc_endpoint: endpoint,
47            grpc_config,
48            grpc,
49            upstream_seqno: None,
50            updates,
51            store: SymbologyStore::new(),
52            ready: Synced::new(false),
53        })
54    }
55
56    pub async fn reconnect(&mut self) -> Result<()> {
57        debug!("reconnecting to {}...", self.grpc_endpoint.uri());
58        let channel = self.grpc_config.connect_to(self.grpc_endpoint.clone()).await?;
59        self.grpc = SymbologyGrpcClient::new(channel)
60            .max_decoding_message_size(100 * 1024 * 1024)
61            .max_encoding_message_size(100 * 1024 * 1024);
62        debug!("subscribing to symbology updates...");
63        self.grpc.subscribe_symbology(SubscribeSymbology {}).await?.into_inner();
64        Ok(())
65    }
66
67    pub fn handle(&self) -> SymbologyClientHandle {
68        SymbologyClientHandle { store: self.store.clone(), ready: self.ready.handle() }
69    }
70
71    pub async fn next(&mut self) -> Result<()> {
72        match self.updates.message().await {
73            Err(e) => bail!("error reading from stream: {:?}", e),
74            Ok(None) => bail!("symbology updates stream ended"),
75            Ok(Some(update)) => {
76                trace!("received update: {:?}", update);
77                if let Some(upstream_seqno) = self.upstream_seqno {
78                    if !update.sequence.is_next_in_sequence(&upstream_seqno) {
79                        bail!(
80                            "skipped sequence number: {} -> {}",
81                            upstream_seqno,
82                            update.sequence
83                        );
84                    }
85                }
86                self.upstream_seqno = Some(update.sequence);
87                self.store.apply_update(update)?;
88                self.ready.set(true);
89            }
90        }
91        Ok(())
92    }
93}