pipeworks_net/bridge.rs
1// use anyhow::{Result, anyhow, bail};
2// use bitcode::{Decode, Encode};
3// use log::info;
4// use std::any::TypeId;
5// use std::collections::HashMap;
6// use std::net::SocketAddr;
7// use std::sync::Arc;
8//
9// use crate::data::bus::{BasicTypeInfo, Bus, BusEvent};
10// use crate::data::node_id::{NODE_ID, NodeId};
11// use crate::data::timestamp::Timestamp;
12// use crate::data::transport::TcpConnected;
13//
14// #[derive(Clone, Debug)]
15// pub struct PeerBridgeConnected {
16// pub peer_socket_addr: SocketAddr,
17// pub peer_id: NodeId,
18// pub sending_types: Vec<BasicTypeInfo>,
19// }
20//
21// #[derive(Clone, Debug)]
22// pub struct PeerBridgeDisconnected {
23// pub peer_socket_addr: SocketAddr,
24// pub peer_id: NodeId,
25// pub error: Arc<Option<anyhow::Error>>,
26// }
27//
28// pub fn do_bridge_peers(bus: Bus) -> Result<()> {
29// let mut tcp_connected_rx = bus.subscribe::<TcpConnected>();
30//
31// tokio::spawn(async move {
32// while let Ok(event) = tcp_connected_rx.recv().await {
33// tokio::spawn(handle_tcp_connection(bus.clone(), event.msg));
34// }
35// });
36//
37// Ok(())
38// }
39//
40// async fn handle_tcp_connection(bus: Bus, event: TcpConnected) -> Result<()> {
41// let Some((tx, mut rx)) = event.stream.lock().unwrap().take() else {
42// bail!("TCP stream was already consumed by another subscriber!");
43// };
44//
45// #[derive(Encode, Decode)]
46// struct PeerTypeRequest {
47// type_name: String,
48// short_id: u32,
49// }
50//
51// #[derive(Encode, Decode)]
52// struct PeerHello {
53// node_id: NodeId,
54// request_types: Vec<PeerTypeRequest>,
55// }
56//
57// #[derive(Encode, Decode)]
58// struct EventForward {
59// short_id: u32,
60// remote_timestamp: Timestamp,
61// bytes: Vec<u8>,
62// }
63//
64// // Get all the types the bus wants from remote nodes. The index in the array is used as their
65// // short id. It only needs to be stable for this specific connection, on this node.
66// let remote_type_infos: Vec<_> = bus
67// .get_types()
68// .into_iter()
69// .filter(|t| t.request_from_peers)
70// .collect();
71//
72// // Keyed by index
73// let short_id_to_type_ids: Vec<TypeId> = remote_type_infos
74// .iter()
75// .map(|type_info| type_info.type_id)
76// .collect();
77//
78// // Two-way handshake payload
79// let hello = PeerHello {
80// node_id: *NODE_ID,
81// request_types: remote_type_infos
82// .iter()
83// .enumerate()
84// .map(|(i, type_info)| PeerTypeRequest {
85// type_name: type_info.type_name.clone(),
86// short_id: i as u32,
87// })
88// .collect(),
89// };
90//
91// // Send hello
92// tx.send(bitcode::encode(&hello)).await?;
93//
94// // Wait for the hello handshake from the remote
95// let bytes = rx
96// .recv()
97// .await
98// .ok_or_else(|| anyhow!("Peer TCP socket closed before receiving handshake"))?;
99// let peer_hello: PeerHello = bitcode::decode(&bytes)?;
100//
101// let type_info_by_type_names: HashMap<String, BasicTypeInfo> = HashMap::from_iter(
102// bus.get_types()
103// .into_iter()
104// .map(|t| (t.type_name.clone(), t)),
105// );
106//
107// let mut sending_types = vec![];
108//
109// // Create a forward task for all the types the peer wants copies of that we also know about.
110// for type_request in peer_hello.request_types {
111// if let Some(type_info) = type_info_by_type_names.get(&type_request.type_name) {
112// sending_types.push(type_info.clone());
113//
114// let mut type_serde_rx = bus.subscribe_serde(&type_info.type_id);
115// let tx = tx.clone();
116//
117// tokio::spawn(async move {
118// while let Ok(event) = type_serde_rx.recv().await {
119// if !event.source.is_me() {
120// continue;
121// }
122//
123// tx.send(bitcode::encode(&EventForward {
124// short_id: type_request.short_id,
125// remote_timestamp: event.time.into(),
126// bytes: event.msg,
127// }))
128// .await
129// .unwrap();
130// }
131// });
132// }
133// }
134//
135// info!(
136// "Peer {} bridged. Forwarding: {}",
137// peer_hello.node_id,
138// sending_types
139// .clone()
140// .into_iter()
141// .map(|t| t.type_name)
142// .collect::<Vec<_>>()
143// .join(", ")
144// );
145//
146// // Send out the connected event
147// bus.send(PeerBridgeConnected {
148// peer_socket_addr: event.peer_socket_addr,
149// peer_id: peer_hello.node_id,
150// sending_types,
151// });
152//
153// // Handle all incoming messages
154// while let Some(bytes) = rx.recv().await {
155// let Ok(forward) = bitcode::decode::<EventForward>(&bytes) else {
156// bail!("Malformed EventForward request from {}", peer_hello.node_id);
157// };
158//
159// let Some(type_id) = short_id_to_type_ids.get(forward.short_id as usize) else {
160// bail!(
161// "EventForward short ID {} is out of range for peer {}",
162// forward.short_id,
163// peer_hello.node_id
164// );
165// };
166//
167// bus.send_serde_event(
168// type_id,
169// BusEvent {
170// source: peer_hello.node_id,
171// time: forward.remote_timestamp.into(),
172// msg: forward.bytes,
173// },
174// );
175// }
176//
177// Ok(())
178// }