iroh_lan/
direct_connect.rs1use actor_helper::{Action, Handle, act};
2use anyhow::Result;
3use iroh::{NodeId, endpoint::Connection, protocol::ProtocolHandler};
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, hash_map::Entry};
6use tracing::{debug, info};
7
8use crate::{connection::Conn, local_networking::Ipv4Pkg};
9
10#[derive(Debug, Clone)]
11pub struct Direct {
12 api: Handle<DirectActor, anyhow::Error>,
13}
14
15#[derive(Debug)]
16struct DirectActor {
17 peers: HashMap<NodeId, Conn>,
18 endpoint: iroh::endpoint::Endpoint,
19 rx: actor_helper::Receiver<Action<DirectActor>>,
20 direct_connect_tx: tokio::sync::broadcast::Sender<DirectMessage>,
21}
22
23#[derive(Debug, Serialize, Deserialize, Clone)]
24pub enum DirectMessage {
25 IpPacket(Ipv4Pkg),
26 IDontLikeWarnings,
27}
28
29impl Direct {
30 pub const ALPN: &[u8] = b"/iroh/lan-direct/1";
31 pub fn new(
32 endpoint: iroh::endpoint::Endpoint,
33 direct_connect_tx: tokio::sync::broadcast::Sender<DirectMessage>,
34 ) -> Self {
35 let (api, rx) = Handle::channel();
36 let mut actor = DirectActor {
37 peers: HashMap::new(),
38 endpoint,
39 rx,
40 direct_connect_tx,
41 };
42 tokio::spawn(async move { actor.run().await });
43 Self { api }
44 }
45
46 pub async fn handle_connection(&self, conn: Connection) -> Result<()> {
47 self.api
48 .call(act!(actor => actor.handle_connection(conn)))
49 .await
50 }
51
52 pub async fn route_packet(&self, to: NodeId, pkg: DirectMessage) -> Result<()> {
53 self.api
54 .call(act!(actor => actor.route_packet(to, pkg)))
55 .await
56 }
57
58 pub async fn get_endpoint(&self) -> iroh::endpoint::Endpoint {
59 self.api
60 .call(act!(actor => actor.get_endpoint()))
61 .await
62 .unwrap()
63 }
64
65 pub async fn get_conn_state(&self, node_id: NodeId) -> Result<crate::connection::ConnState> {
66 self.api
67 .call(act!(actor => actor.get_conn_state(node_id)))
68 .await
69 }
70
71 pub async fn close(&self) -> Result<()> {
72 self.api.call(act!(actor => actor.close())).await
73 }
74}
75
76impl DirectActor {
77 async fn run(&mut self) {
78 loop {
79 tokio::select! {
80 Ok(action) = self.rx.recv_async() => {
81 action(self).await;
82 }
83 _ = tokio::signal::ctrl_c() => {
84 break
85 }
86 }
87 }
88 }
89
90 async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) -> Result<()> {
91 info!("New direct connection from {:?}", conn.remote_node_id()?);
92 let remote_node_id = conn.remote_node_id()?;
93
94 match self.peers.entry(remote_node_id) {
95 Entry::Occupied(mut entry) => {
96 entry.get_mut().incoming_connection(conn, true).await?;
97 }
98 Entry::Vacant(entry) => {
99 let (send_stream, recv_stream) = conn.accept_bi().await?;
100 entry.insert(
101 Conn::new(
102 self.endpoint.clone(),
103 conn,
104 send_stream,
105 recv_stream,
106 self.direct_connect_tx.clone(),
107 )
108 .await?,
109 );
110 }
111 }
112
113 Ok(())
114 }
115
116 async fn route_packet(&mut self, to: NodeId, pkg: DirectMessage) -> Result<()> {
117 match self.peers.entry(to) {
118 Entry::Occupied(entry) => {
119 if entry.get().get_state().await == crate::connection::ConnState::Closed {
120 debug!("Connection to peer {} closed, removing", to);
121 entry.remove();
122 return Err(anyhow::anyhow!("connection to peer is not running"));
123 }
124
125 entry.get().write(pkg).await?;
126 }
127 Entry::Vacant(entry) => {
128 let conn =
129 Conn::connect(self.endpoint.clone(), to, self.direct_connect_tx.clone()).await;
130
131 conn.write(pkg).await?;
132 entry.insert(conn);
133 }
134 }
135
136 Ok(())
137 }
138
139 pub async fn get_conn_state(&self, node_id: NodeId) -> Result<crate::connection::ConnState> {
140 Ok(self
141 .peers
142 .get(&node_id)
143 .cloned()
144 .ok_or(anyhow::anyhow!("no connection to peer"))?
145 .get_state()
146 .await)
147 }
148
149 pub async fn get_endpoint(&self) -> Result<iroh::endpoint::Endpoint> {
150 Ok(self.endpoint.clone())
151 }
152
153 pub async fn close(&mut self) -> Result<()> {
154 for (_, conn) in self.peers.drain() {
155 let _ = conn.close().await;
156 }
157 Ok(())
158 }
159}
160
161impl ProtocolHandler for Direct {
162 async fn accept(
163 &self,
164 connection: iroh::endpoint::Connection,
165 ) -> Result<(), iroh::protocol::AcceptError> {
166 let _ = self.handle_connection(connection).await;
167 Ok(())
168 }
169}