1use anyhow::Result;
2use docker_api::conn::TtyChunk;
3use docker_api::opts::{
4 ContainerConnectionOpts, ContainerCreateOpts, ContainerListOpts, ContainerStopOpts, LogsOpts,
5 NetworkCreateOpts, NetworkListOpts, PublishPort,
6};
7use docker_api::{ApiVersion, Container, Docker, Network};
8use futures::stream::StreamExt;
9use rosetta_client::{Client, Signer, Wallet};
10use rosetta_core::{BlockchainClient, BlockchainConfig};
11use std::time::Duration;
12
13pub struct Env {
14 config: BlockchainConfig,
15 network: Network,
16 node: Container,
17 connector: Container,
18}
19
20impl Env {
21 pub async fn new(prefix: &str, mut config: BlockchainConfig) -> Result<Self> {
22 env_logger::try_init().ok();
23 let builder = EnvBuilder::new(prefix)?;
24 config.node_port = builder.random_port();
25 config.connector_port = builder.random_port();
26 log::info!("node: {}", config.node_port);
27 log::info!("connector: {}", config.connector_port);
28 builder
29 .stop_container(&builder.connector_name(&config))
30 .await?;
31 builder.stop_container(&builder.node_name(&config)).await?;
32 builder.delete_network(&builder.network_name()).await?;
33 let network = builder.create_network().await?;
34 let node = builder.run_node(&config, &network).await?;
35 let connector = builder.run_connector(&config, &network).await?;
36 Ok(Self {
37 config,
38 network,
39 node,
40 connector,
41 })
42 }
43
44 pub async fn node<T: BlockchainClient>(&self) -> Result<T> {
45 let addr = format!("127.0.0.1:{}", self.config.node_port);
46 T::new(self.config.clone(), &addr).await
47 }
48
49 pub fn connector_url(&self) -> String {
50 format!("http://127.0.0.1:{}", self.config.connector_port)
51 }
52
53 pub fn connector(&self) -> Result<Client> {
54 Client::new(&self.connector_url())
55 }
56
57 pub fn ephemeral_wallet(&self) -> Result<Wallet> {
58 let client = self.connector()?;
59 let signer = Signer::generate()?;
60 Wallet::new(self.config.clone(), &signer, client)
61 }
62
63 pub async fn shutdown(&self) -> Result<()> {
64 let opts = ContainerStopOpts::builder().build();
65 self.connector.stop(&opts).await?;
66 self.node.stop(&opts).await?;
67 self.network.delete().await?;
68 Ok(())
69 }
70}
71
72struct EnvBuilder<'a> {
73 prefix: &'a str,
74 docker: Docker,
75}
76
77impl<'a> EnvBuilder<'a> {
78 pub fn new(prefix: &'a str) -> Result<Self> {
79 let version = ApiVersion::new(1, Some(41), None);
80 #[cfg(unix)]
81 let docker = Docker::unix_versioned("/var/run/docker.sock", version);
82 #[cfg(not(unix))]
83 let docker = Docker::tcp_versioned("127.0.0.1:8080", version)?;
84 Ok(Self { prefix, docker })
85 }
86
87 fn random_port(&self) -> u16 {
88 let mut bytes = [0; 2];
89 getrandom::getrandom(&mut bytes).unwrap();
90 u16::from_le_bytes(bytes)
91 }
92
93 fn network_name(&self) -> String {
94 format!("{}-rosetta-docker", self.prefix)
95 }
96
97 fn node_name(&self, config: &BlockchainConfig) -> String {
98 format!(
99 "{}-node-{}-{}",
100 self.prefix, config.blockchain, config.network
101 )
102 }
103
104 fn connector_name(&self, config: &BlockchainConfig) -> String {
105 format!(
106 "{}-connector-{}-{}",
107 self.prefix, config.blockchain, config.network
108 )
109 }
110
111 async fn create_network(&self) -> Result<Network> {
112 let opts = NetworkCreateOpts::builder(self.network_name()).build();
113 let network = self.docker.networks().create(&opts).await?;
114 let id = network.id().clone();
115 Ok(Network::new(self.docker.clone(), id))
116 }
117
118 async fn delete_network(&self, name: &str) -> Result<()> {
119 let opts = NetworkListOpts::builder().build();
120 for network in self.docker.networks().list(&opts).await? {
121 if network.name.as_ref().unwrap() == name {
122 let network = Network::new(self.docker.clone(), network.id.unwrap());
123 network.delete().await.ok();
124 }
125 }
126 Ok(())
127 }
128
129 async fn stop_container(&self, name: &str) -> Result<()> {
130 let opts = ContainerListOpts::builder().all(true).build();
131 for container in self.docker.containers().list(&opts).await? {
132 if container
133 .names
134 .as_ref()
135 .unwrap()
136 .iter()
137 .any(|n| n.as_str().ends_with(name))
138 {
139 let container = Container::new(self.docker.clone(), container.id.unwrap());
140 log::info!("stopping {}", name);
141 container
142 .stop(&ContainerStopOpts::builder().build())
143 .await?;
144 container.delete().await.ok();
145 break;
146 }
147 }
148 Ok(())
149 }
150
151 async fn run_container(
152 &self,
153 name: String,
154 opts: &ContainerCreateOpts,
155 network: &Network,
156 ) -> Result<Container> {
157 log::info!("creating {}", name);
158 let id = self.docker.containers().create(opts).await?.id().clone();
159 let container = Container::new(self.docker.clone(), id.clone());
160
161 let opts = ContainerConnectionOpts::builder(&id).build();
162 network.connect(&opts).await?;
163
164 container.start().await?;
165
166 log::info!("starting {}", name);
167 let container = Container::new(self.docker.clone(), id.clone());
168 tokio::task::spawn(async move {
169 let opts = LogsOpts::builder()
170 .all()
171 .follow(true)
172 .stdout(true)
173 .stderr(true)
174 .build();
175 let mut logs = container.logs(&opts);
176 while let Some(chunk) = logs.next().await {
177 match chunk {
178 Ok(TtyChunk::StdOut(stdout)) => {
179 let stdout = std::str::from_utf8(&stdout).unwrap_or_default();
180 log::info!("{}: stdout: {}", name, stdout);
181 }
182 Ok(TtyChunk::StdErr(stderr)) => {
183 let stderr = std::str::from_utf8(&stderr).unwrap_or_default();
184 log::info!("{}: stderr: {}", name, stderr);
185 }
186 Err(err) => {
187 log::error!("{}", err);
188 }
189 Ok(TtyChunk::StdIn(_)) => unreachable!(),
190 }
191 }
192 log::info!("{}: exited", name);
193 });
194
195 let container = Container::new(self.docker.clone(), id.clone());
196 loop {
197 match health(&container).await? {
198 Some(Health::Unhealthy) => anyhow::bail!("healthcheck reports unhealthy"),
199 Some(Health::Starting) => {
200 tokio::time::sleep(Duration::from_millis(100)).await;
201 }
202 _ => break,
203 }
204 }
205
206 Ok(container)
207 }
208
209 async fn run_node(&self, config: &BlockchainConfig, network: &Network) -> Result<Container> {
210 let name = self.node_name(config);
211 let mut opts = ContainerCreateOpts::builder()
212 .name(&name)
213 .image(config.node_image)
214 .command((config.node_command)(config.network, config.node_port))
215 .auto_remove(true)
216 .attach_stdout(true)
217 .attach_stderr(true)
218 .publish(PublishPort::tcp(config.node_port as _))
219 .expose(
220 PublishPort::tcp(config.node_port as _),
221 config.node_port as _,
222 );
223 for port in config.node_additional_ports {
224 let port = *port as u32;
225 opts = opts.expose(PublishPort::tcp(port), port);
226 }
227 let container = self.run_container(name, &opts.build(), network).await?;
228 tokio::time::sleep(Duration::from_secs(30)).await;
230 Ok(container)
231 }
232
233 async fn run_connector(
234 &self,
235 config: &BlockchainConfig,
236 network: &Network,
237 ) -> Result<Container> {
238 let name = self.connector_name(config);
239 let link = self.node_name(config);
240 let opts = ContainerCreateOpts::builder()
241 .name(&name)
242 .image(format!("analoglabs/connector-{}", config.blockchain))
243 .command(vec![
244 format!("--network={}", config.network),
245 format!("--addr=0.0.0.0:{}", config.connector_port),
246 format!("--node-addr={}:{}", link, config.node_port),
247 "--path=/data".into(),
248 ])
249 .auto_remove(true)
250 .attach_stdout(true)
251 .attach_stderr(true)
252 .expose(
253 PublishPort::tcp(config.connector_port as _),
254 config.connector_port as _,
255 )
256 .build();
257 let container = self.run_container(name, &opts, network).await?;
258 wait_for_http(&format!("http://127.0.0.1:{}", config.connector_port)).await?;
259 Ok(container)
260 }
261}
262
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264enum Health {
265 None,
266 Starting,
267 Healthy,
268 Unhealthy,
269}
270
271async fn health(container: &Container) -> Result<Option<Health>> {
272 let inspect = container.inspect().await?;
273 let status = inspect
274 .state
275 .and_then(|state| state.health)
276 .and_then(|health| health.status);
277 let Some(status) = status else {
278 return Ok(None);
279 };
280 Ok(Some(match status.as_str() {
281 "none" => Health::None,
282 "starting" => Health::Starting,
283 "healthy" => Health::Healthy,
284 "unhealthy" => Health::Unhealthy,
285 status => anyhow::bail!("unknown status {}", status),
286 }))
287}
288
289async fn wait_for_http(url: &str) -> Result<()> {
290 loop {
291 match surf::get(url).await {
292 Ok(_) => {
293 break;
294 }
295 Err(err) => {
296 log::error!("{}", err);
297 tokio::time::sleep(Duration::from_millis(500)).await;
298 }
299 }
300 }
301 Ok(())
302}