cargo_tangle/command/debug/
spawn.rs

1use crate::command::deploy::mbsm::deploy_mbsm_if_needed;
2use crate::command::deploy::tangle::deploy_tangle;
3use crate::command::register::register;
4use blueprint_chain_setup::tangle::testnet::SubstrateNode;
5use blueprint_clients::tangle::client::TangleClient;
6use blueprint_crypto::sp_core::{SpEcdsa, SpSr25519};
7use blueprint_crypto::tangle_pair_signer::TanglePairSigner;
8use blueprint_keystore::backends::Backend;
9use blueprint_keystore::{Keystore, KeystoreConfig};
10use blueprint_manager::blueprint_auth::db::RocksDb;
11use blueprint_manager::config::{AuthProxyOpts, BlueprintManagerConfig};
12use blueprint_manager::executor::run_auth_proxy;
13use blueprint_manager::rt::hypervisor::ServiceVmConfig;
14use blueprint_manager::rt::hypervisor::net::NetworkManager;
15use blueprint_manager::rt::service::Service;
16use blueprint_manager::sources::{BlueprintArgs, BlueprintEnvVars};
17use blueprint_runner::config::{BlueprintEnvironment, Protocol, SupportedChains};
18use nix::sys::termios;
19use nix::sys::termios::{InputFlags, LocalFlags, SetArg};
20use std::io::{self, Read, Write};
21use std::path::{Path, PathBuf};
22use std::{fs, thread};
23use tracing::{error, info, warn};
24use url::Url;
25
26async fn setup_tangle_node(
27    tmp_path: &Path,
28    mut http_rpc_url: Option<Url>,
29    mut ws_rpc_url: Option<Url>,
30) -> color_eyre::Result<(Option<SubstrateNode>, Url, Url)> {
31    let mut node = None;
32    if http_rpc_url.is_none() || ws_rpc_url.is_none() {
33        info!("Spawning Tangle node");
34
35        match blueprint_chain_setup::tangle::run(blueprint_chain_setup::tangle::NodeConfig::new(
36            false,
37        ))
38        .await
39        {
40            Ok(tangle) => {
41                let http = format!("http://127.0.0.1:{}", tangle.ws_port());
42                let ws = format!("ws://127.0.0.1:{}", tangle.ws_port());
43
44                info!("Tangle node running on {http} / {ws}");
45
46                http_rpc_url = Some(Url::parse(&http)?);
47                ws_rpc_url = Some(Url::parse(&ws)?);
48
49                node = Some(tangle);
50            }
51            Err(e) => {
52                error!("Failed to setup local Tangle node: {e}");
53                return Err(e.into());
54            }
55        }
56    }
57
58    let http = http_rpc_url.unwrap();
59    let ws = ws_rpc_url.unwrap();
60
61    let alice_keystore_config = KeystoreConfig::new().fs_root(tmp_path.join("keystore"));
62    let alice_keystore = Keystore::new(alice_keystore_config)?;
63
64    let alice_sr25519_public = alice_keystore.first_local::<SpSr25519>()?;
65    let alice_sr25519_pair = alice_keystore.get_secret::<SpSr25519>(&alice_sr25519_public)?;
66    let alice_sr25519_signer = TanglePairSigner::new(alice_sr25519_pair.0);
67
68    let alice_ecdsa_public = alice_keystore.first_local::<SpEcdsa>()?;
69    let alice_ecdsa_pair = alice_keystore.get_secret::<SpEcdsa>(&alice_ecdsa_public)?;
70    let alice_ecdsa_signer = TanglePairSigner::new(alice_ecdsa_pair.0);
71    let alice_alloy_key = alice_ecdsa_signer
72        .alloy_key()
73        .map_err(|e| color_eyre::Report::msg(format!("Failed to get Alice's Alloy key: {}", e)))?;
74
75    let mut env = BlueprintEnvironment::default();
76    env.http_rpc_endpoint = http.clone();
77    env.ws_rpc_endpoint = ws.clone();
78    let alice_client = TangleClient::with_keystore(env, alice_keystore).await?;
79
80    deploy_mbsm_if_needed(
81        ws.clone(),
82        &alice_client,
83        &alice_sr25519_signer,
84        alice_alloy_key.clone(),
85    )
86    .await?;
87
88    Ok((node, http, ws))
89}
90
91/// Spawns a Tangle testnet and virtual machine for the given blueprint
92///
93/// # Errors
94///
95/// * Unable to spawn/connect to the Tangle node
96///
97/// See also:
98///
99/// * [`deploy_mbsm_if_needed()`]
100/// * [`NetworkManager::new()`]
101/// * [`run_auth_proxy()`]
102/// * [`register()`]
103/// * [`Service::new()`]
104/// * [`Service::start()`]
105#[allow(clippy::too_many_arguments, clippy::missing_panics_doc)]
106pub async fn execute(
107    http_rpc_url: Option<Url>,
108    ws_rpc_url: Option<Url>,
109    manifest_path: PathBuf,
110    package: Option<String>,
111    id: u32,
112    service_name: String,
113    binary: PathBuf,
114    protocol: Protocol,
115    _verify_network_connection: bool,
116    no_vm: bool,
117) -> color_eyre::Result<()> {
118    let mut manager_config = BlueprintManagerConfig::default();
119
120    let tmp = tempfile::tempdir()?;
121    manager_config.data_dir = tmp.path().join("data");
122    manager_config.cache_dir = tmp.path().join("cache");
123    manager_config.runtime_dir = tmp.path().join("runtime");
124    manager_config.keystore_uri = tmp.path().join("keystore").to_string_lossy().into();
125
126    manager_config.verify_directories_exist()?;
127
128    blueprint_testing_utils::tangle::keys::inject_tangle_key(
129        &manager_config.keystore_uri,
130        "//Alice",
131    )?;
132
133    let (_node, http, ws) = setup_tangle_node(tmp.path(), http_rpc_url, ws_rpc_url).await?;
134    Box::pin(deploy_tangle(
135        http.to_string(),
136        ws.to_string(),
137        package,
138        false,
139        Some(PathBuf::from(&manager_config.keystore_uri)),
140        manifest_path,
141    ))
142    .await?;
143    register(ws.to_string(), 0, manager_config.keystore_uri.clone(), "").await?;
144
145    let (db, auth_proxy_task) =
146        run_auth_proxy(manager_config.data_dir.clone(), AuthProxyOpts::default()).await?;
147
148    let args = BlueprintArgs::new(&manager_config);
149    let env = BlueprintEnvVars {
150        http_rpc_endpoint: http,
151        ws_rpc_endpoint: ws,
152        keystore_uri: manager_config.keystore_uri.clone(),
153        data_dir: manager_config.data_dir.clone(),
154        blueprint_id: 0,
155        service_id: 0,
156        protocol,
157        chain: Some(SupportedChains::LocalTestnet),
158        bootnodes: String::new(),
159        registration_mode: false,
160        // Set later
161        bridge_socket_path: None,
162    };
163
164    let mut service = if no_vm {
165        setup_without_vm(manager_config, &service_name, binary, db, env, args)?
166    } else {
167        setup_with_vm(manager_config, &service_name, id, binary, db, env, args).await?
168    };
169
170    // TODO: Check is_alive
171    let _is_alive = Box::pin(service.start().await?.unwrap());
172
173    tokio::select! {
174        _ = tokio::signal::ctrl_c() => {}
175        _ = auth_proxy_task => {
176            warn!("Auth proxy shutdown");
177        },
178    }
179
180    service.shutdown().await?;
181
182    Ok(())
183}
184
185fn setup_without_vm(
186    manager_config: BlueprintManagerConfig,
187    service_name: &str,
188    binary: PathBuf,
189    db: RocksDb,
190    env: BlueprintEnvVars,
191    args: BlueprintArgs,
192) -> color_eyre::Result<Service> {
193    let service = Service::new_native(
194        db,
195        manager_config.runtime_dir,
196        service_name,
197        binary,
198        env,
199        args,
200    )?;
201    Ok(service)
202}
203
204async fn setup_with_vm(
205    manager_config: BlueprintManagerConfig,
206    service_name: &str,
207    id: u32,
208    binary: PathBuf,
209    db: RocksDb,
210    env: BlueprintEnvVars,
211    args: BlueprintArgs,
212) -> color_eyre::Result<Service> {
213    let network_candidates = manager_config
214        .default_address_pool
215        .hosts()
216        .filter(|ip| ip.octets()[3] != 0 && ip.octets()[3] != 255)
217        .collect();
218    let network_manager = NetworkManager::new(network_candidates).await.map_err(|e| {
219        error!("Failed to create network manager: {e}");
220        e
221    })?;
222
223    let service = Service::new(
224        ServiceVmConfig {
225            id,
226            pty: true,
227            ..Default::default()
228        },
229        network_manager,
230        db,
231        manager_config.data_dir,
232        manager_config.keystore_uri,
233        manager_config.cache_dir,
234        &manager_config.runtime_dir,
235        service_name,
236        binary,
237        env,
238        args,
239    )
240    .await?;
241
242    let pty = service
243        .hypervisor()
244        .expect("is hypervisor service")
245        .pty()
246        .await?
247        .unwrap();
248    info!("VM serial output to: {}", pty.display());
249
250    let pty = fs::OpenOptions::new().read(true).write(true).open(pty)?;
251
252    set_raw_mode(&pty)?;
253
254    let mut pty_reader = pty.try_clone()?;
255    let mut pty_writer = pty;
256
257    let stdin_to_pty = thread::spawn(move || {
258        let mut stdin = std::io::stdin();
259        let mut buffer = [0u8; 1024];
260        loop {
261            match stdin.read(&mut buffer) {
262                Ok(0) | Err(_) => break,
263                Ok(n) => {
264                    if pty_writer.write_all(&buffer[..n]).is_err() {
265                        break;
266                    }
267                }
268            }
269        }
270    });
271
272    let pty_to_stdout = thread::spawn(move || {
273        let mut stdout = std::io::stdout();
274        let mut buffer = [0u8; 1024];
275        loop {
276            match pty_reader.read(&mut buffer) {
277                Ok(0) | Err(_) => break,
278                Ok(n) => {
279                    if stdout.write_all(&buffer[..n]).is_err() {
280                        break;
281                    }
282                    stdout.flush().ok();
283                }
284            }
285        }
286    });
287
288    stdin_to_pty.join().unwrap();
289    pty_to_stdout.join().unwrap();
290
291    Ok(service)
292}
293
294fn set_raw_mode(fd: &fs::File) -> io::Result<()> {
295    let mut termios = termios::tcgetattr(fd)?;
296
297    termios.input_flags &= !(InputFlags::ICRNL | InputFlags::IXON);
298    termios.local_flags &= !(LocalFlags::ICANON | LocalFlags::ECHO | LocalFlags::ISIG);
299
300    termios::tcsetattr(fd, SetArg::TCSANOW, &termios)?;
301
302    Ok(())
303}