daml_bridge/
bridge.rs

1use crate::aliases::{Archive, BridgeConfig, GrpcClient};
2use crate::server::make_server;
3use anyhow::Result;
4use daml_grpc::DamlGrpcClientBuilder;
5use daml_lf::element::DamlArchive;
6use daml_util::package::{ArchiveAutoNamingStyle, DamlPackages};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tokio::time::Duration;
10use tracing::{error, info};
11
12/// Daml JSON<>GRPC API bridge.
13pub struct Bridge {
14    config: BridgeConfig,
15}
16
17impl Bridge {
18    /// Create a new [`Bridge`].
19    pub const fn new(config: BridgeConfig) -> Self {
20        Self {
21            config,
22        }
23    }
24
25    /// Start the bridge.
26    pub async fn run(&self) -> Result<()> {
27        let grpc_client = Arc::new(
28            DamlGrpcClientBuilder::uri(self.config.ledger_uri())
29                .connect_timeout(Some(self.config.ledger_connect_timeout()))
30                .timeout(self.config.ledger_timeout())
31                .with_auth(self.config.ledger_token().to_owned())
32                .connect()
33                .await?,
34        );
35        let archive: Archive = Arc::new(RwLock::new(fetch_archive(&grpc_client).await?));
36        let http_server = make_server(self.config.clone(), archive.clone(), grpc_client.clone())?;
37        let package_refresher = refresh(archive.clone(), grpc_client.clone(), self.config.package_reload_interval());
38        info!("Ready");
39        let http_handle = tokio::spawn(http_server);
40        let refresher_handle = tokio::spawn(package_refresher);
41        let _result = tokio::join!(http_handle, refresher_handle);
42        Ok(())
43    }
44}
45
46/// Refresh the [`Archive`] from the ledger server.
47async fn refresh(archive: Archive, grpc_client: GrpcClient, interval: Duration) {
48    let mut timer = tokio::time::interval(interval);
49    let _ = timer.tick().await;
50    loop {
51        let now = timer.tick().await;
52        info!("refreshing Dar (Time now = {:?})", now);
53        let new_archive = fetch_archive(&grpc_client).await;
54        match new_archive {
55            Ok(new_arch) => *archive.write().await = new_arch,
56            Err(e) => {
57                error!("error refreshing Dar from ledger: {}", e);
58            },
59        }
60    }
61}
62
63async fn fetch_archive(grpc_client: &GrpcClient) -> Result<DamlArchive<'static>> {
64    let all_packages = DamlPackages::from_ledger(grpc_client).await?;
65    tokio::task::spawn_blocking(move || create_archive(all_packages)).await?
66}
67
68fn create_archive(packages: DamlPackages) -> Result<DamlArchive<'static>> {
69    Ok(packages.into_dar(ArchiveAutoNamingStyle::Uuid)?.to_owned_archive()?)
70}