1mod blockfetch;
2mod chainsync;
3mod channel;
4mod stream;
5mod txsubmit;
6
7pub mod packet;
8
9use cardano_sdk::chaininfo::ChainInfo;
10use cardano_sdk::protocol::{
11 BlockFetch, ChainSync, DiffusionMode, Point, Protocol, TxSubmit, Version,
12};
13use std::collections::HashMap;
14use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
15use std::str::FromStr;
16use stream::StreamStatsShared;
17use thiserror::*;
18use tokio::net::TcpStream;
19use tracing::{debug, info};
20
21use trust_dns_resolver::{config::*, TokioAsyncResolver};
22
23pub use blockfetch::BlockFetcher;
24pub use chainsync::ChainIntersection;
25pub use channel::Channel;
26use packet::rawchan::{ChannelWriter, RawChannel};
27pub use packet::{frame::PacketBytes, ProtocolError};
28pub use txsubmit::SubmitNext;
29
30#[derive(Clone, Debug)]
31pub struct NetworkDescription {
32 pub anchor_hosts: Vec<(String, u16)>,
33 pub chain_info: ChainInfo,
34 pub net_versions: Vec<Version>,
35 pub known_points: Vec<(u64, Point)>,
36}
37
38impl NetworkDescription {
39 pub fn mainnet() -> Self {
40 Self {
41 anchor_hosts: vec![(String::from("relays-new.cardano-mainnet.iohk.io."), 3001)],
42 chain_info: ChainInfo::MAINNET,
43 net_versions: vec![Version::V6, Version::V7, Version::V8],
44 known_points: vec![(
45 1,
46 Point::from_raw(
47 0,
48 "f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f",
49 )
50 .unwrap(),
51 )],
52 }
53 }
54
55 pub fn testnet() -> Self {
56 Self {
57 anchor_hosts: vec![(String::from("relays-new.cardano-testnet.iohkdev.io."), 3001)],
58 chain_info: ChainInfo::TESTNET,
59 net_versions: vec![Version::V6, Version::V7, Version::V8],
60 known_points: vec![
61 (
62 1,
63 Point::from_raw(
64 1031,
65 "388a82f053603f3552717d61644a353188f2d5500f4c6354cc1ad27a36a7ea91",
66 )
67 .unwrap(),
68 ),
69 (
71 1597133,
72 Point::from_raw(
73 1598400,
74 "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f",
75 )
76 .unwrap(),
77 ),
78 (
80 2349176,
81 Point::from_raw(
82 19742400,
83 "80095b5d08fe5a97581f4ba940cb91410eb88673f581fc61785608718553fab6",
84 )
85 .unwrap(),
86 ),
87 ],
88 }
89 }
90
91 pub fn preprod() -> Self {
92 Self {
93 anchor_hosts: vec![(String::from("preprod-node.world.dev.cardano.org."), 30000)],
94 chain_info: ChainInfo::PREPROD,
95 net_versions: vec![Version::V6, Version::V7, Version::V8],
96 known_points: vec![(
97 46,
99 Point::from_raw(
100 86400,
101 "c4a1595c5cc7a31eda9e544986fe9387af4e3491afe0ca9a80714f01951bbd5c",
102 )
103 .unwrap(),
104 )],
105 }
106 }
107
108 pub fn preview() -> Self {
109 Self {
110 anchor_hosts: vec![(String::from("preview-node.world.dev.cardano.org."), 30002)],
111 chain_info: ChainInfo::PREVIEW,
112 net_versions: vec![Version::V6, Version::V7, Version::V8],
113 known_points: vec![(
114 505,
116 Point::from_raw(
117 86400,
118 "283b30cedbfc17647564abb43012bd2f17871ec0be748535c38a52088ced6a9f",
119 )
120 .unwrap(),
121 )],
122 }
123 }
124
125 pub fn override_hosts(mut self, v: &[(String, u16)]) -> Self {
126 self.anchor_hosts = v.to_owned();
127 self
128 }
129}
130
131async fn resolve_name(dest: &str) -> Result<Vec<IpAddr>, ()> {
132 let ip = match Ipv4Addr::from_str(dest) {
133 Ok(addr) => Some(IpAddr::V4(addr)),
134 Err(_) => match Ipv6Addr::from_str(dest) {
135 Ok(addr6) => Some(IpAddr::V6(addr6)),
136 Err(_) => None,
137 },
138 };
139
140 match ip {
141 None => {
143 let resolver =
144 TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default())
145 .unwrap();
146 let response = resolver.lookup_ip(dest).await.unwrap();
147 let addresses = response.iter().collect::<Vec<_>>();
148 Ok(addresses)
149 }
150 Some(ip) => Ok(vec![ip]),
151 }
152}
153
154async fn connect_to(
155 destinations: &[(String, u16)],
156) -> Result<(SocketAddr, TcpStream), Vec<std::io::Error>> {
157 let mut errors = Vec::new();
158
159 for (dest, port) in destinations {
160 let ip_addresses = resolve_name(dest).await.unwrap();
161
162 for ip_addr in ip_addresses {
164 let addr = SocketAddr::new(ip_addr, *port);
165 debug!("trying to connect to {}:{} ({})", dest, port, ip_addr);
166 match TcpStream::connect(&addr).await {
167 Err(e) => errors.push(e),
168 Ok(stream) => {
169 info!("connected to {}:{} ({})", dest, port, ip_addr);
170 return Ok((addr, stream));
171 }
172 }
173 }
174 }
175
176 return Err(errors);
177}
178
179pub struct NetworkHandle {
180 #[allow(dead_code)]
181 handle: tokio::task::JoinHandle<()>,
182
183 pub sockaddr: SocketAddr,
185 pub chainsync: Channel<ChainSync>,
187 pub txsubmit: Channel<TxSubmit>,
189 pub blockfetch: Channel<BlockFetch>,
191 pub stats: StreamStatsShared,
193}
194
195#[derive(Debug, Error)]
196pub enum NetworkError {
197 #[error("protocol error {0}")]
198 ProtocolError(#[from] ProtocolError),
199 #[error("handshake error {0}")]
200 HandshakeError(#[from] stream::HandshakeError),
201 #[error("Connect I/O error {0:?}")]
202 ConnectError(Vec<std::io::Error>),
203}
204
205fn channel<P: Protocol>(
206 writer: tokio::sync::mpsc::Sender<PacketBytes>,
207) -> (ChannelWriter, Channel<P>) {
208 let (sender, receiver) = tokio::sync::mpsc::channel(16);
209 let channel = RawChannel::new(P::NUMBER, writer, receiver);
210 (ChannelWriter(sender), Channel::new(channel))
211}
212
213impl NetworkHandle {
214 pub async fn start(
217 network_description: &NetworkDescription,
218 ) -> Result<NetworkHandle, NetworkError> {
219 let (sockaddr, tcp) = connect_to(&network_description.anchor_hosts)
221 .await
222 .map_err(|e| NetworkError::ConnectError(e))?;
223
224 let (tx_sender, tx_receiver) = tokio::sync::mpsc::channel(64);
225
226 let (chainsync_w, chainsync) = channel(tx_sender.clone());
227 let (txsubmit_w, txsubmit) = channel(tx_sender.clone());
228 let (blockfetch_w, blockfetch) = channel(tx_sender);
229 let channels = HashMap::from([
230 (ChainSync::NUMBER, chainsync_w),
231 (TxSubmit::NUMBER, txsubmit_w),
232 (BlockFetch::NUMBER, blockfetch_w),
233 ]);
234
235 let stats = stream::StreamStatsShared::default();
236
237 let mut stream = stream::Stream::new(stats.clone(), tcp, tx_receiver, channels);
240 stream
241 .handshake(
242 network_description.chain_info.protocol_magic,
243 DiffusionMode::InitiatorOnly,
244 &network_description.net_versions,
245 )
246 .await??;
247
248 let handle = tokio::spawn(async move { stream.process_fragment().await });
249
250 Ok(NetworkHandle {
251 sockaddr,
252 handle,
253 chainsync,
254 blockfetch,
255 txsubmit,
256 stats,
257 })
258 }
259
260 pub async fn stop(self) {
261 self.handle.abort()
262 }
263}