cardano_net/
lib.rs

1mod blockfetch;
2mod chainsync;
3mod channel;
4mod stream;
5mod txsubmit;
6
7pub mod packet;
8
9use cardano_sdk::chaininfo::ChainInfo;
10use cardano_sdk::protocol::{
11    BlockFetch, ChainSync, DiffusionMode, Point, Protocol, TxSubmit, Version,
12};
13use std::collections::HashMap;
14use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
15use std::str::FromStr;
16use stream::StreamStatsShared;
17use thiserror::*;
18use tokio::net::TcpStream;
19use tracing::{debug, info};
20
21use trust_dns_resolver::{config::*, TokioAsyncResolver};
22
23pub use blockfetch::BlockFetcher;
24pub use chainsync::ChainIntersection;
25pub use channel::Channel;
26use packet::rawchan::{ChannelWriter, RawChannel};
27pub use packet::{frame::PacketBytes, ProtocolError};
28pub use txsubmit::SubmitNext;
29
30#[derive(Clone, Debug)]
31pub struct NetworkDescription {
32    pub anchor_hosts: Vec<(String, u16)>,
33    pub chain_info: ChainInfo,
34    pub net_versions: Vec<Version>,
35    pub known_points: Vec<(u64, Point)>,
36}
37
38impl NetworkDescription {
39    pub fn mainnet() -> Self {
40        Self {
41            anchor_hosts: vec![(String::from("relays-new.cardano-mainnet.iohk.io."), 3001)],
42            chain_info: ChainInfo::MAINNET,
43            net_versions: vec![Version::V6, Version::V7, Version::V8],
44            known_points: vec![(
45                1,
46                Point::from_raw(
47                    0,
48                    "f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f",
49                )
50                .unwrap(),
51            )],
52        }
53    }
54
55    pub fn testnet() -> Self {
56        Self {
57            anchor_hosts: vec![(String::from("relays-new.cardano-testnet.iohkdev.io."), 3001)],
58            chain_info: ChainInfo::TESTNET,
59            net_versions: vec![Version::V6, Version::V7, Version::V8],
60            known_points: vec![
61                (
62                    1,
63                    Point::from_raw(
64                        1031,
65                        "388a82f053603f3552717d61644a353188f2d5500f4c6354cc1ad27a36a7ea91",
66                    )
67                    .unwrap(),
68                ),
69                // shelley
70                (
71                    1597133,
72                    Point::from_raw(
73                        1598400,
74                        "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f",
75                    )
76                    .unwrap(),
77                ),
78                // after shelley
79                (
80                    2349176,
81                    Point::from_raw(
82                        19742400,
83                        "80095b5d08fe5a97581f4ba940cb91410eb88673f581fc61785608718553fab6",
84                    )
85                    .unwrap(),
86                ),
87            ],
88        }
89    }
90
91    pub fn preprod() -> Self {
92        Self {
93            anchor_hosts: vec![(String::from("preprod-node.world.dev.cardano.org."), 30000)],
94            chain_info: ChainInfo::PREPROD,
95            net_versions: vec![Version::V6, Version::V7, Version::V8],
96            known_points: vec![(
97                // shelley
98                46,
99                Point::from_raw(
100                    86400,
101                    "c4a1595c5cc7a31eda9e544986fe9387af4e3491afe0ca9a80714f01951bbd5c",
102                )
103                .unwrap(),
104            )],
105        }
106    }
107
108    pub fn preview() -> Self {
109        Self {
110            anchor_hosts: vec![(String::from("preview-node.world.dev.cardano.org."), 30002)],
111            chain_info: ChainInfo::PREVIEW,
112            net_versions: vec![Version::V6, Version::V7, Version::V8],
113            known_points: vec![(
114                // shelley
115                505,
116                Point::from_raw(
117                    86400,
118                    "283b30cedbfc17647564abb43012bd2f17871ec0be748535c38a52088ced6a9f",
119                )
120                .unwrap(),
121            )],
122        }
123    }
124
125    pub fn override_hosts(mut self, v: &[(String, u16)]) -> Self {
126        self.anchor_hosts = v.to_owned();
127        self
128    }
129}
130
131async fn resolve_name(dest: &str) -> Result<Vec<IpAddr>, ()> {
132    let ip = match Ipv4Addr::from_str(dest) {
133        Ok(addr) => Some(IpAddr::V4(addr)),
134        Err(_) => match Ipv6Addr::from_str(dest) {
135            Ok(addr6) => Some(IpAddr::V6(addr6)),
136            Err(_) => None,
137        },
138    };
139
140    match ip {
141        // possibly a host then
142        None => {
143            let resolver =
144                TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default())
145                    .unwrap();
146            let response = resolver.lookup_ip(dest).await.unwrap();
147            let addresses = response.iter().collect::<Vec<_>>();
148            Ok(addresses)
149        }
150        Some(ip) => Ok(vec![ip]),
151    }
152}
153
154async fn connect_to(
155    destinations: &[(String, u16)],
156) -> Result<(SocketAddr, TcpStream), Vec<std::io::Error>> {
157    let mut errors = Vec::new();
158
159    for (dest, port) in destinations {
160        let ip_addresses = resolve_name(dest).await.unwrap();
161
162        // try to connect from (resolved) ip addresses at the expected port
163        for ip_addr in ip_addresses {
164            let addr = SocketAddr::new(ip_addr, *port);
165            debug!("trying to connect to {}:{} ({})", dest, port, ip_addr);
166            match TcpStream::connect(&addr).await {
167                Err(e) => errors.push(e),
168                Ok(stream) => {
169                    info!("connected to {}:{} ({})", dest, port, ip_addr);
170                    return Ok((addr, stream));
171                }
172            }
173        }
174    }
175
176    return Err(errors);
177}
178
179pub struct NetworkHandle {
180    #[allow(dead_code)]
181    handle: tokio::task::JoinHandle<()>,
182
183    /// the socket address of the opposite side
184    pub sockaddr: SocketAddr,
185    /// chainsync sub protocol
186    pub chainsync: Channel<ChainSync>,
187    /// txsubmit sub protocol
188    pub txsubmit: Channel<TxSubmit>,
189    /// blockfetch sub protocol
190    pub blockfetch: Channel<BlockFetch>,
191    /// Stats for the stream
192    pub stats: StreamStatsShared,
193}
194
195#[derive(Debug, Error)]
196pub enum NetworkError {
197    #[error("protocol error {0}")]
198    ProtocolError(#[from] ProtocolError),
199    #[error("handshake error {0}")]
200    HandshakeError(#[from] stream::HandshakeError),
201    #[error("Connect I/O error {0:?}")]
202    ConnectError(Vec<std::io::Error>),
203}
204
205fn channel<P: Protocol>(
206    writer: tokio::sync::mpsc::Sender<PacketBytes>,
207) -> (ChannelWriter, Channel<P>) {
208    let (sender, receiver) = tokio::sync::mpsc::channel(16);
209    let channel = RawChannel::new(P::NUMBER, writer, receiver);
210    (ChannelWriter(sender), Channel::new(channel))
211}
212
213impl NetworkHandle {
214    /// Connect to the network with the expected parameters, perform the protocol handshake
215    /// and in the background wait for packet to send and receive.
216    pub async fn start(
217        network_description: &NetworkDescription,
218    ) -> Result<NetworkHandle, NetworkError> {
219        // try to connect to one of the known anchors in network_description
220        let (sockaddr, tcp) = connect_to(&network_description.anchor_hosts)
221            .await
222            .map_err(|e| NetworkError::ConnectError(e))?;
223
224        let (tx_sender, tx_receiver) = tokio::sync::mpsc::channel(64);
225
226        let (chainsync_w, chainsync) = channel(tx_sender.clone());
227        let (txsubmit_w, txsubmit) = channel(tx_sender.clone());
228        let (blockfetch_w, blockfetch) = channel(tx_sender);
229        let channels = HashMap::from([
230            (ChainSync::NUMBER, chainsync_w),
231            (TxSubmit::NUMBER, txsubmit_w),
232            (BlockFetch::NUMBER, blockfetch_w),
233        ]);
234
235        let stats = stream::StreamStatsShared::default();
236
237        // create a cardano stream, perform the handshake with the connected client
238        // and then create the receiving / sending loop
239        let mut stream = stream::Stream::new(stats.clone(), tcp, tx_receiver, channels);
240        stream
241            .handshake(
242                network_description.chain_info.protocol_magic,
243                DiffusionMode::InitiatorOnly,
244                &network_description.net_versions,
245            )
246            .await??;
247
248        let handle = tokio::spawn(async move { stream.process_fragment().await });
249
250        Ok(NetworkHandle {
251            sockaddr,
252            handle,
253            chainsync,
254            blockfetch,
255            txsubmit,
256            stats,
257        })
258    }
259
260    pub async fn stop(self) {
261        self.handle.abort()
262    }
263}