pipeworks_net/
bridge.rs

1// use anyhow::{Result, anyhow, bail};
2// use bitcode::{Decode, Encode};
3// use log::info;
4// use std::any::TypeId;
5// use std::collections::HashMap;
6// use std::net::SocketAddr;
7// use std::sync::Arc;
8//
9// use crate::data::bus::{BasicTypeInfo, Bus, BusEvent};
10// use crate::data::node_id::{NODE_ID, NodeId};
11// use crate::data::timestamp::Timestamp;
12// use crate::data::transport::TcpConnected;
13//
14// #[derive(Clone, Debug)]
15// pub struct PeerBridgeConnected {
16//     pub peer_socket_addr: SocketAddr,
17//     pub peer_id: NodeId,
18//     pub sending_types: Vec<BasicTypeInfo>,
19// }
20//
21// #[derive(Clone, Debug)]
22// pub struct PeerBridgeDisconnected {
23//     pub peer_socket_addr: SocketAddr,
24//     pub peer_id: NodeId,
25//     pub error: Arc<Option<anyhow::Error>>,
26// }
27//
28// pub fn do_bridge_peers(bus: Bus) -> Result<()> {
29//     let mut tcp_connected_rx = bus.subscribe::<TcpConnected>();
30//
31//     tokio::spawn(async move {
32//         while let Ok(event) = tcp_connected_rx.recv().await {
33//             tokio::spawn(handle_tcp_connection(bus.clone(), event.msg));
34//         }
35//     });
36//
37//     Ok(())
38// }
39//
40// async fn handle_tcp_connection(bus: Bus, event: TcpConnected) -> Result<()> {
41//     let Some((tx, mut rx)) = event.stream.lock().unwrap().take() else {
42//         bail!("TCP stream was already consumed by another subscriber!");
43//     };
44//
45//     #[derive(Encode, Decode)]
46//     struct PeerTypeRequest {
47//         type_name: String,
48//         short_id: u32,
49//     }
50//
51//     #[derive(Encode, Decode)]
52//     struct PeerHello {
53//         node_id: NodeId,
54//         request_types: Vec<PeerTypeRequest>,
55//     }
56//
57//     #[derive(Encode, Decode)]
58//     struct EventForward {
59//         short_id: u32,
60//         remote_timestamp: Timestamp,
61//         bytes: Vec<u8>,
62//     }
63//
64//     // Get all the types the bus wants from remote nodes. The index in the array is used as their
65//     // short id. It only needs to be stable for this specific connection, on this node.
66//     let remote_type_infos: Vec<_> = bus
67//         .get_types()
68//         .into_iter()
69//         .filter(|t| t.request_from_peers)
70//         .collect();
71//
72//     // Keyed by index
73//     let short_id_to_type_ids: Vec<TypeId> = remote_type_infos
74//         .iter()
75//         .map(|type_info| type_info.type_id)
76//         .collect();
77//
78//     // Two-way handshake payload
79//     let hello = PeerHello {
80//         node_id: *NODE_ID,
81//         request_types: remote_type_infos
82//             .iter()
83//             .enumerate()
84//             .map(|(i, type_info)| PeerTypeRequest {
85//                 type_name: type_info.type_name.clone(),
86//                 short_id: i as u32,
87//             })
88//             .collect(),
89//     };
90//
91//     // Send hello
92//     tx.send(bitcode::encode(&hello)).await?;
93//
94//     // Wait for the hello handshake from the remote
95//     let bytes = rx
96//         .recv()
97//         .await
98//         .ok_or_else(|| anyhow!("Peer TCP socket closed before receiving handshake"))?;
99//     let peer_hello: PeerHello = bitcode::decode(&bytes)?;
100//
101//     let type_info_by_type_names: HashMap<String, BasicTypeInfo> = HashMap::from_iter(
102//         bus.get_types()
103//             .into_iter()
104//             .map(|t| (t.type_name.clone(), t)),
105//     );
106//
107//     let mut sending_types = vec![];
108//
109//     // Create a forward task for all the types the peer wants copies of that we also know about.
110//     for type_request in peer_hello.request_types {
111//         if let Some(type_info) = type_info_by_type_names.get(&type_request.type_name) {
112//             sending_types.push(type_info.clone());
113//
114//             let mut type_serde_rx = bus.subscribe_serde(&type_info.type_id);
115//             let tx = tx.clone();
116//
117//             tokio::spawn(async move {
118//                 while let Ok(event) = type_serde_rx.recv().await {
119//                     if !event.source.is_me() {
120//                         continue;
121//                     }
122//
123//                     tx.send(bitcode::encode(&EventForward {
124//                         short_id: type_request.short_id,
125//                         remote_timestamp: event.time.into(),
126//                         bytes: event.msg,
127//                     }))
128//                     .await
129//                     .unwrap();
130//                 }
131//             });
132//         }
133//     }
134//
135//     info!(
136//         "Peer {} bridged. Forwarding: {}",
137//         peer_hello.node_id,
138//         sending_types
139//             .clone()
140//             .into_iter()
141//             .map(|t| t.type_name)
142//             .collect::<Vec<_>>()
143//             .join(", ")
144//     );
145//
146//     // Send out the connected event
147//     bus.send(PeerBridgeConnected {
148//         peer_socket_addr: event.peer_socket_addr,
149//         peer_id: peer_hello.node_id,
150//         sending_types,
151//     });
152//
153//     // Handle all incoming messages
154//     while let Some(bytes) = rx.recv().await {
155//         let Ok(forward) = bitcode::decode::<EventForward>(&bytes) else {
156//             bail!("Malformed EventForward request from {}", peer_hello.node_id);
157//         };
158//
159//         let Some(type_id) = short_id_to_type_ids.get(forward.short_id as usize) else {
160//             bail!(
161//                 "EventForward short ID {} is out of range for peer {}",
162//                 forward.short_id,
163//                 peer_hello.node_id
164//             );
165//         };
166//
167//         bus.send_serde_event(
168//             type_id,
169//             BusEvent {
170//                 source: peer_hello.node_id,
171//                 time: forward.remote_timestamp.into(),
172//                 msg: forward.bytes,
173//             },
174//         );
175//     }
176//
177//     Ok(())
178// }