logo
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use crate::aliases::{Archive, BridgeConfig, GrpcClient};
use crate::server::make_server;
use anyhow::Result;
use daml_grpc::DamlGrpcClientBuilder;
use daml_lf::element::DamlArchive;
use daml_util::package::{ArchiveAutoNamingStyle, DamlPackages};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::Duration;
use tracing::{error, info};

/// Daml JSON<>GRPC API bridge.
pub struct Bridge {
    config: BridgeConfig,
}

impl Bridge {
    /// Create a new [`Bridge`].
    pub const fn new(config: BridgeConfig) -> Self {
        Self {
            config,
        }
    }

    /// Start the bridge.
    pub async fn run(&self) -> Result<()> {
        let grpc_client = Arc::new(
            DamlGrpcClientBuilder::uri(self.config.ledger_uri())
                .connect_timeout(Some(self.config.ledger_connect_timeout()))
                .timeout(self.config.ledger_timeout())
                .with_auth(self.config.ledger_token().to_owned())
                .connect()
                .await?,
        );
        let archive: Archive = Arc::new(RwLock::new(fetch_archive(&grpc_client).await?));
        let http_server = make_server(self.config.clone(), archive.clone(), grpc_client.clone())?;
        let package_refresher = refresh(archive.clone(), grpc_client.clone(), self.config.package_reload_interval());
        info!("Ready");
        let http_handle = tokio::spawn(http_server);
        let refresher_handle = tokio::spawn(package_refresher);
        let _result = tokio::join!(http_handle, refresher_handle);
        Ok(())
    }
}

/// Refresh the [`Archive`] from the ledger server.
async fn refresh(archive: Archive, grpc_client: GrpcClient, interval: Duration) {
    let mut timer = tokio::time::interval(interval);
    let _ = timer.tick().await;
    loop {
        let now = timer.tick().await;
        info!("refreshing Dar (Time now = {:?})", now);
        let new_archive = fetch_archive(&grpc_client).await;
        match new_archive {
            Ok(new_arch) => *archive.write().await = new_arch,
            Err(e) => {
                error!("error refreshing Dar from ledger: {}", e);
            },
        }
    }
}

async fn fetch_archive(grpc_client: &GrpcClient) -> Result<DamlArchive<'static>> {
    let all_packages = DamlPackages::from_ledger(grpc_client).await?;
    tokio::task::spawn_blocking(move || create_archive(all_packages)).await?
}

fn create_archive(packages: DamlPackages) -> Result<DamlArchive<'static>> {
    Ok(packages.into_dar(ArchiveAutoNamingStyle::Uuid)?.to_owned_archive()?)
}