reticulum-rs 0.1.3

Reticulum-rs is a Rust implementation of the Reticulum Network Stack - a cryptographic, decentralised, and resilient mesh networking protocol designed for communication over any physical layer. This project is open source and community-owned, focused on bringing Reticulum capabilities to the Rust ecosystem with clear APIs, reproducible behavior, and portable deployment options.
Documentation
use super::jobs::manage_transport;
use super::wire::handle_inbound_packet_for_test;
use super::*;

impl Transport {
    pub fn new(config: TransportConfig) -> Self {
        let (announce_tx, _) = tokio::sync::broadcast::channel(16);
        let (link_in_event_tx, _) = tokio::sync::broadcast::channel(16);
        let (link_out_event_tx, _) = tokio::sync::broadcast::channel(16);
        let (received_data_tx, _) = tokio::sync::broadcast::channel(16);
        let (iface_messages_tx, _) = tokio::sync::broadcast::channel(16);
        let (resource_events_tx, _) = tokio::sync::broadcast::channel(16);

        let iface_manager = InterfaceManager::new(16);

        let rx_receiver = iface_manager.receiver();

        let iface_manager = Arc::new(Mutex::new(iface_manager));

        let announce_cache_capacity = config.announce_cache_capacity;
        let announce_retry_limit = config.announce_retry_limit;
        let announce_queue_len = config.announce_queue_len;
        let announce_cap = config.announce_cap;
        let path_request_timeout_secs = config.path_request_timeout_secs;
        let link_proof_timeout_secs = config.link_proof_timeout_secs;
        let link_idle_timeout_secs = config.link_idle_timeout_secs;
        let resource_retry_interval_secs = config.resource_retry_interval_secs;
        let resource_retry_limit = config.resource_retry_limit;
        let ratchet_store = config.ratchet_store_path.as_ref().map(|path| {
            let mut store = RatchetStore::new(path.clone());
            store.clean_expired(now_secs());
            store
        });

        let transport_id = if config.retransmit {
            Some(*config.identity.address_hash())
        } else {
            None
        };
        let path_requests = PathRequests::new(
            config.name.as_str(),
            transport_id,
            announce_queue_len,
            announce_cap,
            path_request_timeout_secs,
        );

        let path_request_dest = create_path_request_destination().desc.address_hash;

        let cancel = CancellationToken::new();
        let name = config.name.clone();
        let handler = Arc::new(Mutex::new(TransportHandler {
            config,
            iface_manager: iface_manager.clone(),
            announce_table: AnnounceTable::new(announce_cache_capacity, announce_retry_limit),
            link_table: LinkTable::new(
                Duration::from_secs(link_proof_timeout_secs),
                Duration::from_secs(link_idle_timeout_secs),
            ),
            path_table: PathTable::new(),
            single_in_destinations: HashMap::new(),
            single_out_destinations: HashMap::new(),
            announce_limits: AnnounceLimits::new(),
            out_links: HashMap::new(),
            in_links: HashMap::new(),
            packet_cache: Mutex::new(PacketCache::new()),
            path_requests,
            announce_tx,
            link_in_event_tx: link_in_event_tx.clone(),
            received_data_tx: received_data_tx.clone(),
            ratchet_store,
            resource_manager: ResourceManager::new_with_config(
                Duration::from_secs(resource_retry_interval_secs),
                resource_retry_limit,
            ),
            resource_events_tx: resource_events_tx.clone(),
            fixed_dest_path_requests: path_request_dest,
            cancel: cancel.clone(),
            receipt_handler: None,
        }));

        {
            let handler = handler.clone();
            tokio::spawn(manage_transport(
                handler,
                rx_receiver,
                iface_messages_tx.clone(),
            ))
        };
        {
            let mut link_rx = link_in_event_tx.subscribe();
            let received_data_tx = received_data_tx.clone();
            tokio::spawn(async move {
                loop {
                    match link_rx.recv().await {
                        Ok(event) => {
                            if let LinkEvent::Data(payload) = event.event {
                                let _ = received_data_tx.send(ReceivedData {
                                    destination: event.address_hash,
                                    data: PacketDataBuffer::new_from_slice(payload.as_slice()),
                                    ratchet_used: false,
                                });
                            }
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                        Err(broadcast::error::RecvError::Lagged(_)) => continue,
                    }
                }
            });
        }

        Self {
            name,
            iface_manager,
            link_in_event_tx,
            link_out_event_tx,
            received_data_tx,
            iface_messages_tx,
            resource_events_tx,
            handler,
            cancel,
        }
    }

    pub async fn outbound(&self, packet: &Packet) {
        let (packet, maybe_iface) = self.handler.lock().await.path_table.handle_packet(packet);

        if let Some(iface) = maybe_iface {
            self.send_direct(iface, packet).await;
            log::trace!("Sent outbound packet to {}", iface);
        }
        if maybe_iface.is_none() {
            let handler = self.handler.lock().await;
            if handler.config.broadcast {
                handler
                    .send(TxMessage {
                        tx_type: TxMessageType::Broadcast(None),
                        packet,
                    })
                    .await;
            } else {
                log::trace!(
                    "tp({}): no route for outbound packet dst={}",
                    self.name,
                    packet.destination
                );
            }
        }
    }

    pub fn iface_manager(&self) -> Arc<Mutex<InterfaceManager>> {
        self.iface_manager.clone()
    }

    pub fn iface_rx(&self) -> broadcast::Receiver<RxMessage> {
        self.iface_messages_tx.subscribe()
    }

    pub fn resource_events(&self) -> broadcast::Receiver<ResourceEvent> {
        self.resource_events_tx.subscribe()
    }

    pub async fn recv_announces(&self) -> broadcast::Receiver<AnnounceEvent> {
        self.handler.lock().await.announce_tx.subscribe()
    }

    pub async fn send_packet(&self, packet: Packet) {
        let mut handler = self.handler.lock().await;
        handler.send_packet(packet).await;
    }

    pub async fn send_packet_with_outcome(&self, packet: Packet) -> SendPacketOutcome {
        let mut handler = self.handler.lock().await;
        handler.send_packet_with_outcome(packet).await
    }

    pub async fn send_packet_with_trace(&self, packet: Packet) -> SendPacketTrace {
        let mut handler = self.handler.lock().await;
        handler.send_packet_with_trace(packet).await
    }

    pub async fn send_announce(
        &self,
        destination: &Arc<Mutex<SingleInputDestination>>,
        app_data: Option<&[u8]>,
    ) {
        let mut destination = destination.lock().await;
        let packet = destination
            .announce(OsRng, app_data)
            .expect("valid announce packet");
        let mut handler = self.handler.lock().await;
        handler.send_packet(packet).await;
    }

    pub async fn set_receipt_handler(&mut self, handler: Box<dyn ReceiptHandler>) {
        self.handler.lock().await.receipt_handler = Some(Arc::from(handler));
    }

    pub fn emit_receipt_for_test(&self, receipt: DeliveryReceipt) {
        let receipt_handler = self
            .handler
            .try_lock()
            .ok()
            .and_then(|handler| handler.receipt_handler.clone());

        if let Some(handler) = receipt_handler {
            handler.on_receipt(&receipt);
        }
    }

    pub async fn handle_inbound_for_test(&self, packet: Packet) {
        let (receipt, receipt_handler) = {
            let mut handler = self.handler.lock().await;
            let receipt = handle_inbound_packet_for_test(&packet, &mut handler);
            let receipt_handler = handler.receipt_handler.clone();
            (receipt, receipt_handler)
        };

        if let (Some(receipt), Some(handler)) = (receipt, receipt_handler) {
            handler.on_receipt(&receipt);
        }
    }

    pub async fn send_broadcast(&self, packet: Packet, from_iface: Option<AddressHash>) {
        self.handler
            .lock()
            .await
            .send(TxMessage {
                tx_type: TxMessageType::Broadcast(from_iface),
                packet,
            })
            .await;
    }

    pub async fn send_direct(&self, addr: AddressHash, packet: Packet) {
        self.handler
            .lock()
            .await
            .send(TxMessage {
                tx_type: TxMessageType::Direct(addr),
                packet,
            })
            .await;
    }
}