iroh_lan/
direct_connect.rs

1use actor_helper::{Action, Handle, act};
2use anyhow::Result;
3use iroh::{NodeId, endpoint::Connection, protocol::ProtocolHandler};
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, hash_map::Entry};
6use tracing::{debug, info};
7
8use crate::{connection::Conn, local_networking::Ipv4Pkg};
9
10#[derive(Debug, Clone)]
11pub struct Direct {
12    api: Handle<DirectActor, anyhow::Error>,
13}
14
15#[derive(Debug)]
16struct DirectActor {
17    peers: HashMap<NodeId, Conn>,
18    endpoint: iroh::endpoint::Endpoint,
19    rx: actor_helper::Receiver<Action<DirectActor>>,
20    direct_connect_tx: tokio::sync::broadcast::Sender<DirectMessage>,
21}
22
23#[derive(Debug, Serialize, Deserialize, Clone)]
24pub enum DirectMessage {
25    IpPacket(Ipv4Pkg),
26    IDontLikeWarnings,
27}
28
29impl Direct {
30    pub const ALPN: &[u8] = b"/iroh/lan-direct/1";
31    pub fn new(
32        endpoint: iroh::endpoint::Endpoint,
33        direct_connect_tx: tokio::sync::broadcast::Sender<DirectMessage>,
34    ) -> Self {
35        let (api, rx) = Handle::channel();
36        let mut actor = DirectActor {
37            peers: HashMap::new(),
38            endpoint,
39            rx,
40            direct_connect_tx,
41        };
42        tokio::spawn(async move { actor.run().await });
43        Self { api }
44    }
45
46    pub async fn handle_connection(&self, conn: Connection) -> Result<()> {
47        self.api
48            .call(act!(actor => actor.handle_connection(conn)))
49            .await
50    }
51
52    pub async fn route_packet(&self, to: NodeId, pkg: DirectMessage) -> Result<()> {
53        self.api
54            .call(act!(actor => actor.route_packet(to, pkg)))
55            .await
56    }
57
58    pub async fn get_endpoint(&self) -> iroh::endpoint::Endpoint {
59        self.api
60            .call(act!(actor => actor.get_endpoint()))
61            .await
62            .unwrap()
63    }
64
65    pub async fn get_conn_state(&self, node_id: NodeId) -> Result<crate::connection::ConnState> {
66        self.api
67            .call(act!(actor => actor.get_conn_state(node_id)))
68            .await
69    }
70
71    pub async fn close(&self) -> Result<()> {
72        self.api.call(act!(actor => actor.close())).await
73    }
74}
75
76impl DirectActor {
77    async fn run(&mut self) {
78        loop {
79            tokio::select! {
80                Ok(action) = self.rx.recv_async() => {
81                    action(self).await;
82                }
83                _ = tokio::signal::ctrl_c() => {
84                    break
85                }
86            }
87        }
88    }
89
90    async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) -> Result<()> {
91        info!("New direct connection from {:?}", conn.remote_node_id()?);
92        let remote_node_id = conn.remote_node_id()?;
93
94        match self.peers.entry(remote_node_id) {
95            Entry::Occupied(mut entry) => {
96                entry.get_mut().incoming_connection(conn, true).await?;
97            }
98            Entry::Vacant(entry) => {
99                let (send_stream, recv_stream) = conn.accept_bi().await?;
100                entry.insert(
101                    Conn::new(
102                        self.endpoint.clone(),
103                        conn,
104                        send_stream,
105                        recv_stream,
106                        self.direct_connect_tx.clone(),
107                    )
108                    .await?,
109                );
110            }
111        }
112
113        Ok(())
114    }
115
116    async fn route_packet(&mut self, to: NodeId, pkg: DirectMessage) -> Result<()> {
117        match self.peers.entry(to) {
118            Entry::Occupied(entry) => {
119                if entry.get().get_state().await == crate::connection::ConnState::Closed {
120                    debug!("Connection to peer {} closed, removing", to);
121                    entry.remove();
122                    return Err(anyhow::anyhow!("connection to peer is not running"));
123                }
124
125                entry.get().write(pkg).await?;
126            }
127            Entry::Vacant(entry) => {
128                let conn =
129                    Conn::connect(self.endpoint.clone(), to, self.direct_connect_tx.clone()).await;
130
131                conn.write(pkg).await?;
132                entry.insert(conn);
133            }
134        }
135
136        Ok(())
137    }
138
139    pub async fn get_conn_state(&self, node_id: NodeId) -> Result<crate::connection::ConnState> {
140        Ok(self
141            .peers
142            .get(&node_id)
143            .cloned()
144            .ok_or(anyhow::anyhow!("no connection to peer"))?
145            .get_state()
146            .await)
147    }
148
149    pub async fn get_endpoint(&self) -> Result<iroh::endpoint::Endpoint> {
150        Ok(self.endpoint.clone())
151    }
152
153    pub async fn close(&mut self) -> Result<()> {
154        for (_, conn) in self.peers.drain() {
155            let _ = conn.close().await;
156        }
157        Ok(())
158    }
159}
160
161impl ProtocolHandler for Direct {
162    async fn accept(
163        &self,
164        connection: iroh::endpoint::Connection,
165    ) -> Result<(), iroh::protocol::AcceptError> {
166        let _ = self.handle_connection(connection).await;
167        Ok(())
168    }
169}