architect_sdk/symbology/
client.rs1use super::store::SymbologyStore;
2use crate::{
3 grpc::GrpcClientConfig,
4 synced::{Synced, SyncedHandle},
5};
6use anyhow::{bail, Result};
7use architect_api::{
8 grpc::service::symbology_client::SymbologyClient as SymbologyGrpcClient,
9 symbology::protocol::{SubscribeSymbology, SymbologyUpdate},
10 utils::sequence::SequenceIdAndNumber,
11};
12use log::{debug, trace};
13use tonic::{
14 transport::{Channel, Endpoint},
15 Streaming,
16};
17use url::Url;
18
19pub struct SymbologyClient {
20 grpc_endpoint: Endpoint,
21 grpc_config: GrpcClientConfig,
22 grpc: SymbologyGrpcClient<Channel>,
23 upstream_seqno: Option<SequenceIdAndNumber>,
24 updates: Streaming<SymbologyUpdate>,
25 pub store: SymbologyStore,
26 pub ready: Synced<bool>,
27}
28
29#[derive(Clone)]
30pub struct SymbologyClientHandle {
31 pub store: SymbologyStore,
32 pub ready: SyncedHandle<bool>,
33}
34
35impl SymbologyClient {
36 pub async fn connect(url: &Url, grpc_config: GrpcClientConfig) -> Result<Self> {
37 let endpoint = Endpoint::try_from(url.to_string())?;
38 debug!("connecting to {}...", endpoint.uri());
39 let channel = grpc_config.connect_to(endpoint.clone()).await?;
40 let mut grpc = SymbologyGrpcClient::new(channel)
41 .max_decoding_message_size(100 * 1024 * 1024)
42 .max_encoding_message_size(100 * 1024 * 1024);
43 debug!("subscribing to symbology updates...");
44 let updates = grpc.subscribe_symbology(SubscribeSymbology {}).await?.into_inner();
45 Ok(Self {
46 grpc_endpoint: endpoint,
47 grpc_config,
48 grpc,
49 upstream_seqno: None,
50 updates,
51 store: SymbologyStore::new(),
52 ready: Synced::new(false),
53 })
54 }
55
56 pub async fn reconnect(&mut self) -> Result<()> {
57 debug!("reconnecting to {}...", self.grpc_endpoint.uri());
58 let channel = self.grpc_config.connect_to(self.grpc_endpoint.clone()).await?;
59 self.grpc = SymbologyGrpcClient::new(channel)
60 .max_decoding_message_size(100 * 1024 * 1024)
61 .max_encoding_message_size(100 * 1024 * 1024);
62 debug!("subscribing to symbology updates...");
63 self.grpc.subscribe_symbology(SubscribeSymbology {}).await?.into_inner();
64 Ok(())
65 }
66
67 pub fn handle(&self) -> SymbologyClientHandle {
68 SymbologyClientHandle { store: self.store.clone(), ready: self.ready.handle() }
69 }
70
71 pub async fn next(&mut self) -> Result<()> {
72 match self.updates.message().await {
73 Err(e) => bail!("error reading from stream: {:?}", e),
74 Ok(None) => bail!("symbology updates stream ended"),
75 Ok(Some(update)) => {
76 trace!("received update: {:?}", update);
77 if let Some(upstream_seqno) = self.upstream_seqno {
78 if !update.sequence.is_next_in_sequence(&upstream_seqno) {
79 bail!(
80 "skipped sequence number: {} -> {}",
81 upstream_seqno,
82 update.sequence
83 );
84 }
85 }
86 self.upstream_seqno = Some(update.sequence);
87 self.store.apply_update(update)?;
88 self.ready.set(true);
89 }
90 }
91 Ok(())
92 }
93}