reticulum-rs 0.1.3

Reticulum-rs is a Rust implementation of the Reticulum Network Stack - a cryptographic, decentralised, and resilient mesh networking protocol designed for communication over any physical layer. This project is open source and community-owned, focused on bringing Reticulum capabilities to the Rust ecosystem with clear APIs, reproducible behavior, and portable deployment options.
Documentation
use super::path::send_to_next_hop;
use super::*;

pub(super) async fn handle_proof(packet: Packet, handler: Arc<Mutex<TransportHandler>>) {
    if packet.context == PacketContext::ResourceProof
        && packet.header.destination_type == DestinationType::Link
    {
        let mut handler = handler.lock().await;
        let mut link = handler
            .in_links
            .get(&packet.destination)
            .cloned()
            .or_else(|| handler.out_links.get(&packet.destination).cloned());
        if link.is_none() {
            for candidate in handler.out_links.values() {
                if *candidate.lock().await.id() == packet.destination {
                    link = Some(candidate.clone());
                    break;
                }
            }
        }
        if let Some(link) = link {
            let mut link = link.lock().await;
            let responses = handler.resource_manager.handle_packet(&packet, &mut link);
            let events = handler.resource_manager.drain_events();
            drop(link);
            for response in responses {
                handler.send_packet(response).await;
            }
            for event in events {
                let _ = handler.resource_events_tx.send(event);
            }
        }
        return;
    }
    eprintln!(
        "[tp] proof dst={} ctx={:02x}",
        packet.destination, packet.context as u8
    );
    let receipt_hash =
        if packet.context != PacketContext::LinkRequestProof && packet.data.len() >= HASH_SIZE {
            let mut hash = [0u8; HASH_SIZE];
            hash.copy_from_slice(&packet.data.as_slice()[..HASH_SIZE]);
            Some(hash)
        } else {
            None
        };
    if let Some(receipt_hash) = receipt_hash {
        let receipt = DeliveryReceipt::new(receipt_hash);
        let receipt_handler = {
            let handler = handler.lock().await;
            log::trace!(
                "tp({}): handle proof for {}",
                handler.config.name,
                packet.destination
            );
            handler.receipt_handler.clone()
        };

        if let Some(receipt_handler) = receipt_handler {
            receipt_handler.on_receipt(&receipt);
        }
    }

    let mut handler = handler.lock().await;

    let mut rtt_packets = Vec::new();
    for link in handler.out_links.values() {
        let mut link = link.lock().await;
        if let LinkHandleResult::Activated = link.handle_packet(&packet) {
            rtt_packets.push(link.create_rtt());
        }
    }
    for packet in rtt_packets {
        handler.send_packet(packet).await;
    }

    let maybe_packet = handler.link_table.handle_proof(&packet);

    if let Some((packet, iface)) = maybe_packet {
        handler
            .send(TxMessage {
                tx_type: TxMessageType::Direct(iface),
                packet,
            })
            .await;
    }
}

pub(super) fn handle_inbound_packet_for_test(
    packet: &Packet,
    _handler: &mut MutexGuard<'_, TransportHandler>,
) -> Option<DeliveryReceipt> {
    match packet.header.packet_type {
        PacketType::Proof => {
            if packet.context != PacketContext::LinkRequestProof && packet.data.len() >= HASH_SIZE {
                let mut hash = [0u8; HASH_SIZE];
                hash.copy_from_slice(&packet.data.as_slice()[..HASH_SIZE]);
                Some(DeliveryReceipt::new(hash))
            } else {
                None
            }
        }
        _ => None,
    }
}

pub(super) async fn handle_keepalive_response<'a>(
    packet: &Packet,
    handler: &mut MutexGuard<'a, TransportHandler>,
) -> bool {
    if packet.context == PacketContext::KeepAlive
        && packet.data.as_slice()[0] == KEEP_ALIVE_RESPONSE
    {
        let lookup = handler.link_table.handle_keepalive(packet);

        if let Some((propagated, iface)) = lookup {
            handler
                .send(TxMessage {
                    tx_type: TxMessageType::Direct(iface),
                    packet: propagated,
                })
                .await;
        }

        return true;
    }

    false
}

pub(super) fn should_encrypt_packet(packet: &Packet) -> bool {
    if packet.header.packet_type != PacketType::Data {
        return false;
    }
    if packet.header.destination_type != DestinationType::Single {
        return false;
    }
    !matches!(
        packet.context,
        PacketContext::Resource
            | PacketContext::ResourceAdvrtisement
            | PacketContext::ResourceRequest
            | PacketContext::ResourceHashUpdate
            | PacketContext::ResourceProof
            | PacketContext::ResourceInitiatorCancel
            | PacketContext::ResourceReceiverCancel
            | PacketContext::KeepAlive
            | PacketContext::CacheRequest
    )
}

pub(super) async fn handle_data<'a>(
    packet: &Packet,
    mut handler: MutexGuard<'a, TransportHandler>,
) {
    let mut data_handled = false;

    if packet.header.destination_type == DestinationType::Link {
        if matches!(
            packet.context,
            PacketContext::Resource
                | PacketContext::ResourceAdvrtisement
                | PacketContext::ResourceRequest
                | PacketContext::ResourceHashUpdate
                | PacketContext::ResourceProof
                | PacketContext::ResourceInitiatorCancel
                | PacketContext::ResourceReceiverCancel
        ) {
            let mut link = handler
                .in_links
                .get(&packet.destination)
                .cloned()
                .or_else(|| handler.out_links.get(&packet.destination).cloned());
            if link.is_none() {
                for candidate in handler.out_links.values() {
                    if *candidate.lock().await.id() == packet.destination {
                        link = Some(candidate.clone());
                        break;
                    }
                }
            }

            if let Some(link) = link {
                let mut link = link.lock().await;
                let needs_decrypt = matches!(
                    packet.context,
                    PacketContext::ResourceAdvrtisement
                        | PacketContext::ResourceRequest
                        | PacketContext::ResourceHashUpdate
                        | PacketContext::ResourceInitiatorCancel
                        | PacketContext::ResourceReceiverCancel
                );
                let packet_for_manager = if needs_decrypt {
                    let mut buffer = PacketDataBuffer::new();
                    let plain_len =
                        match link.decrypt(packet.data.as_slice(), buffer.accuire_buf_max()) {
                            Ok(plain) => plain.len(),
                            Err(err) => {
                                log::warn!("resource: failed to decrypt packet: {:?}", err);
                                return;
                            }
                        };
                    buffer.resize(plain_len);
                    let mut plain_packet = *packet;
                    plain_packet.data = buffer;
                    plain_packet
                } else {
                    *packet
                };
                let responses = handler
                    .resource_manager
                    .handle_packet(&packet_for_manager, &mut link);
                let events = handler.resource_manager.drain_events();
                drop(link);
                for response in responses {
                    handler.send_packet(response).await;
                }
                for event in events {
                    let _ = handler.resource_events_tx.send(event);
                }
                return;
            }
        }

        eprintln!(
            "[tp] link_data dst={} ctx={:02x} len={}",
            packet.destination,
            packet.context as u8,
            packet.data.len()
        );
        let mut link_packets = Vec::new();
        if let Some(link) = handler.in_links.get(&packet.destination).cloned() {
            let mut link = link.lock().await;
            let result = link.handle_packet(packet);
            if let LinkHandleResult::KeepAlive = result {
                link_packets.push(link.keep_alive_packet(KEEP_ALIVE_RESPONSE));
            } else if let LinkHandleResult::Proof(proof_packet) = result {
                link_packets.push(proof_packet);
            }
        }

        let mut proof_packets = Vec::new();
        for link in handler.out_links.values() {
            let mut link = link.lock().await;
            let result = link.handle_packet(packet);
            if let LinkHandleResult::Proof(proof_packet) = result {
                proof_packets.push(proof_packet);
            }
            data_handled = true;
        }

        for packet in link_packets {
            handler.send_packet(packet).await;
        }
        for packet in proof_packets {
            handler.send_packet(packet).await;
        }

        if handle_keepalive_response(packet, &mut handler).await {
            return;
        }

        let lookup = handler.link_table.original_destination(&packet.destination);
        if lookup.is_some() {
            let sent = send_to_next_hop(packet, &handler, lookup).await;

            log::trace!(
                "tp({}): {} packet to remote link {}",
                handler.config.name,
                if sent {
                    "forwarded"
                } else {
                    "could not forward"
                },
                packet.destination
            );
        }
    }

    if packet.header.destination_type == DestinationType::Single {
        if let Some(destination) = handler
            .single_in_destinations
            .get(&packet.destination)
            .cloned()
        {
            data_handled = true;
            let mut ratchet_used = false;
            let payload = if should_encrypt_packet(packet) {
                let mut destination = destination.lock().await;
                match destination.decrypt_with_ratchets(packet.data.as_slice()) {
                    Ok((plaintext, used)) => {
                        ratchet_used = used;
                        plaintext
                    }
                    Err(err) => {
                        log::warn!(
                            "tp({}): decrypt failed for {}: {:?}",
                            handler.config.name,
                            packet.destination,
                            err
                        );
                        return;
                    }
                }
            } else {
                packet.data.as_slice().to_vec()
            };
            let mut buffer = PacketDataBuffer::new();
            if buffer.write(&payload).is_err() {
                log::warn!(
                    "tp({}): decrypted payload too large for {}",
                    handler.config.name,
                    packet.destination
                );
                return;
            }
            handler
                .received_data_tx
                .send(ReceivedData {
                    destination: packet.destination,
                    data: buffer,
                    ratchet_used,
                })
                .ok();
        } else {
            data_handled = send_to_next_hop(packet, &handler, None).await;
        }
    }

    if data_handled {
        log::trace!(
            "tp({}): handle data request for {} dst={:2x} ctx={:2x}",
            handler.config.name,
            packet.destination,
            packet.header.destination_type as u8,
            packet.context as u8,
        );
    }
}