rosetta_docker/
lib.rs

1use anyhow::Result;
2use docker_api::conn::TtyChunk;
3use docker_api::opts::{
4    ContainerConnectionOpts, ContainerCreateOpts, ContainerListOpts, ContainerStopOpts, LogsOpts,
5    NetworkCreateOpts, NetworkListOpts, PublishPort,
6};
7use docker_api::{ApiVersion, Container, Docker, Network};
8use futures::stream::StreamExt;
9use rosetta_client::{Client, Signer, Wallet};
10use rosetta_core::{BlockchainClient, BlockchainConfig};
11use std::time::Duration;
12
13pub struct Env {
14    config: BlockchainConfig,
15    network: Network,
16    node: Container,
17    connector: Container,
18}
19
20impl Env {
21    pub async fn new(prefix: &str, mut config: BlockchainConfig) -> Result<Self> {
22        env_logger::try_init().ok();
23        let builder = EnvBuilder::new(prefix)?;
24        config.node_port = builder.random_port();
25        config.connector_port = builder.random_port();
26        log::info!("node: {}", config.node_port);
27        log::info!("connector: {}", config.connector_port);
28        builder
29            .stop_container(&builder.connector_name(&config))
30            .await?;
31        builder.stop_container(&builder.node_name(&config)).await?;
32        builder.delete_network(&builder.network_name()).await?;
33        let network = builder.create_network().await?;
34        let node = builder.run_node(&config, &network).await?;
35        let connector = builder.run_connector(&config, &network).await?;
36        Ok(Self {
37            config,
38            network,
39            node,
40            connector,
41        })
42    }
43
44    pub async fn node<T: BlockchainClient>(&self) -> Result<T> {
45        let addr = format!("127.0.0.1:{}", self.config.node_port);
46        T::new(self.config.clone(), &addr).await
47    }
48
49    pub fn connector_url(&self) -> String {
50        format!("http://127.0.0.1:{}", self.config.connector_port)
51    }
52
53    pub fn connector(&self) -> Result<Client> {
54        Client::new(&self.connector_url())
55    }
56
57    pub fn ephemeral_wallet(&self) -> Result<Wallet> {
58        let client = self.connector()?;
59        let signer = Signer::generate()?;
60        Wallet::new(self.config.clone(), &signer, client)
61    }
62
63    pub async fn shutdown(&self) -> Result<()> {
64        let opts = ContainerStopOpts::builder().build();
65        self.connector.stop(&opts).await?;
66        self.node.stop(&opts).await?;
67        self.network.delete().await?;
68        Ok(())
69    }
70}
71
72struct EnvBuilder<'a> {
73    prefix: &'a str,
74    docker: Docker,
75}
76
77impl<'a> EnvBuilder<'a> {
78    pub fn new(prefix: &'a str) -> Result<Self> {
79        let version = ApiVersion::new(1, Some(41), None);
80        #[cfg(unix)]
81        let docker = Docker::unix_versioned("/var/run/docker.sock", version);
82        #[cfg(not(unix))]
83        let docker = Docker::tcp_versioned("127.0.0.1:8080", version)?;
84        Ok(Self { prefix, docker })
85    }
86
87    fn random_port(&self) -> u16 {
88        let mut bytes = [0; 2];
89        getrandom::getrandom(&mut bytes).unwrap();
90        u16::from_le_bytes(bytes)
91    }
92
93    fn network_name(&self) -> String {
94        format!("{}-rosetta-docker", self.prefix)
95    }
96
97    fn node_name(&self, config: &BlockchainConfig) -> String {
98        format!(
99            "{}-node-{}-{}",
100            self.prefix, config.blockchain, config.network
101        )
102    }
103
104    fn connector_name(&self, config: &BlockchainConfig) -> String {
105        format!(
106            "{}-connector-{}-{}",
107            self.prefix, config.blockchain, config.network
108        )
109    }
110
111    async fn create_network(&self) -> Result<Network> {
112        let opts = NetworkCreateOpts::builder(self.network_name()).build();
113        let network = self.docker.networks().create(&opts).await?;
114        let id = network.id().clone();
115        Ok(Network::new(self.docker.clone(), id))
116    }
117
118    async fn delete_network(&self, name: &str) -> Result<()> {
119        let opts = NetworkListOpts::builder().build();
120        for network in self.docker.networks().list(&opts).await? {
121            if network.name.as_ref().unwrap() == name {
122                let network = Network::new(self.docker.clone(), network.id.unwrap());
123                network.delete().await.ok();
124            }
125        }
126        Ok(())
127    }
128
129    async fn stop_container(&self, name: &str) -> Result<()> {
130        let opts = ContainerListOpts::builder().all(true).build();
131        for container in self.docker.containers().list(&opts).await? {
132            if container
133                .names
134                .as_ref()
135                .unwrap()
136                .iter()
137                .any(|n| n.as_str().ends_with(name))
138            {
139                let container = Container::new(self.docker.clone(), container.id.unwrap());
140                log::info!("stopping {}", name);
141                container
142                    .stop(&ContainerStopOpts::builder().build())
143                    .await?;
144                container.delete().await.ok();
145                break;
146            }
147        }
148        Ok(())
149    }
150
151    async fn run_container(
152        &self,
153        name: String,
154        opts: &ContainerCreateOpts,
155        network: &Network,
156    ) -> Result<Container> {
157        log::info!("creating {}", name);
158        let id = self.docker.containers().create(opts).await?.id().clone();
159        let container = Container::new(self.docker.clone(), id.clone());
160
161        let opts = ContainerConnectionOpts::builder(&id).build();
162        network.connect(&opts).await?;
163
164        container.start().await?;
165
166        log::info!("starting {}", name);
167        let container = Container::new(self.docker.clone(), id.clone());
168        tokio::task::spawn(async move {
169            let opts = LogsOpts::builder()
170                .all()
171                .follow(true)
172                .stdout(true)
173                .stderr(true)
174                .build();
175            let mut logs = container.logs(&opts);
176            while let Some(chunk) = logs.next().await {
177                match chunk {
178                    Ok(TtyChunk::StdOut(stdout)) => {
179                        let stdout = std::str::from_utf8(&stdout).unwrap_or_default();
180                        log::info!("{}: stdout: {}", name, stdout);
181                    }
182                    Ok(TtyChunk::StdErr(stderr)) => {
183                        let stderr = std::str::from_utf8(&stderr).unwrap_or_default();
184                        log::info!("{}: stderr: {}", name, stderr);
185                    }
186                    Err(err) => {
187                        log::error!("{}", err);
188                    }
189                    Ok(TtyChunk::StdIn(_)) => unreachable!(),
190                }
191            }
192            log::info!("{}: exited", name);
193        });
194
195        let container = Container::new(self.docker.clone(), id.clone());
196        loop {
197            match health(&container).await? {
198                Some(Health::Unhealthy) => anyhow::bail!("healthcheck reports unhealthy"),
199                Some(Health::Starting) => {
200                    tokio::time::sleep(Duration::from_millis(100)).await;
201                }
202                _ => break,
203            }
204        }
205
206        Ok(container)
207    }
208
209    async fn run_node(&self, config: &BlockchainConfig, network: &Network) -> Result<Container> {
210        let name = self.node_name(config);
211        let mut opts = ContainerCreateOpts::builder()
212            .name(&name)
213            .image(config.node_image)
214            .command((config.node_command)(config.network, config.node_port))
215            .auto_remove(true)
216            .attach_stdout(true)
217            .attach_stderr(true)
218            .publish(PublishPort::tcp(config.node_port as _))
219            .expose(
220                PublishPort::tcp(config.node_port as _),
221                config.node_port as _,
222            );
223        for port in config.node_additional_ports {
224            let port = *port as u32;
225            opts = opts.expose(PublishPort::tcp(port), port);
226        }
227        let container = self.run_container(name, &opts.build(), network).await?;
228        //wait_for_http(&format!("http://127.0.0.1:{}", config.node_port)).await?;
229        tokio::time::sleep(Duration::from_secs(30)).await;
230        Ok(container)
231    }
232
233    async fn run_connector(
234        &self,
235        config: &BlockchainConfig,
236        network: &Network,
237    ) -> Result<Container> {
238        let name = self.connector_name(config);
239        let link = self.node_name(config);
240        let opts = ContainerCreateOpts::builder()
241            .name(&name)
242            .image(format!("analoglabs/connector-{}", config.blockchain))
243            .command(vec![
244                format!("--network={}", config.network),
245                format!("--addr=0.0.0.0:{}", config.connector_port),
246                format!("--node-addr={}:{}", link, config.node_port),
247                "--path=/data".into(),
248            ])
249            .auto_remove(true)
250            .attach_stdout(true)
251            .attach_stderr(true)
252            .expose(
253                PublishPort::tcp(config.connector_port as _),
254                config.connector_port as _,
255            )
256            .build();
257        let container = self.run_container(name, &opts, network).await?;
258        wait_for_http(&format!("http://127.0.0.1:{}", config.connector_port)).await?;
259        Ok(container)
260    }
261}
262
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264enum Health {
265    None,
266    Starting,
267    Healthy,
268    Unhealthy,
269}
270
271async fn health(container: &Container) -> Result<Option<Health>> {
272    let inspect = container.inspect().await?;
273    let status = inspect
274        .state
275        .and_then(|state| state.health)
276        .and_then(|health| health.status);
277    let Some(status) = status else {
278        return Ok(None);
279    };
280    Ok(Some(match status.as_str() {
281        "none" => Health::None,
282        "starting" => Health::Starting,
283        "healthy" => Health::Healthy,
284        "unhealthy" => Health::Unhealthy,
285        status => anyhow::bail!("unknown status {}", status),
286    }))
287}
288
289async fn wait_for_http(url: &str) -> Result<()> {
290    loop {
291        match surf::get(url).await {
292            Ok(_) => {
293                break;
294            }
295            Err(err) => {
296                log::error!("{}", err);
297                tokio::time::sleep(Duration::from_millis(500)).await;
298            }
299        }
300    }
301    Ok(())
302}