gstuff 0.8.16

Small macro and trinkets that make my life easier.
Documentation
// https://www.bittorrent.org/beps/bep_0005.html DHT Protocol
// https://www.bittorrent.org/beps/bep_0024.html Tracker Returns External IP

//βš† run from web.rs, to see long-term incoming packets in the log
//βš† incoming and outgoing packets as serde structs, allowing for hot reload / persistance
//  🌡 one idea that comes to mind is to run DHT episodically between invocations:
//     imagine a cron script that maintains a DHT despite being absent at times! πŸͺƒ
//     might be suboptimal for a full node due to short DHT timeouts (2 seconds in libtorrent)
//     but feasible for a read-only node, for it to register late find_node and ping replies;
//     consequentry the input packet queue should include the timestamp,
//     in order for late handling not to affect peer statistics;
//     a separate UDP server can listen on the port then, saving the packets for when the script is invoked;
//     running a script every five or ten seconds should be all right,
//     and it might also be useful to run the code when a certain number of incoming packets have accumulated

//βš† when we schedule a packet, we should note down the β€œt” we've used, in order to recognize the reply;
// will save us also from unnecessarily calling that node again before a reply or a timeout;
// it is a β€œrunning” timeout in a sense that we need not to clear it unless we actually picked that node
// for another transmission; as such, might implement it as a timestamp;
// ergo, let β€œt” itself be a timestamp! (base62 delta from a stateful epoch)

//βš† keep a (running average) of minimal (fastest) reply delay observed,
// as it might be useful for congestion control

//βš† keep track of the IP:port information received in β€œip”, a kind of auto-expiring histogram,
// but use timestamps in order to discard outdated results (possibly inherited via `include_bytes!`, etc)

//βš† gather some stats on which *versions* perform on BEP 44 best

//βš† a way for call site to prioritize certain peers, establishing layers of trust,
// in order for routing to work
// (or else prioritize by ID distance and group our peers with a custom ID prefix)
//βš† support call site injecting extra payload into ping and extracting from pongs,
// allowing custom peers to recognize each other and to exchange information under pretence
// of simple DHT maintenance; might also be a good idea to doctor the β€œt” (transaction ID)
// and β€œv”, https://stackoverflow.com/questions/9343828/mainline-dht-unspecified-entry-in-bencoded-dictionary
// to match a certain digest over ID, f (node_id, secret) = (t, v)
// or be a part of a public key, f (node_id, t, v, secret) = public

//βš† answer ping
//βš† answer find_node
//βš† β€œcheck whether the ID in the PONG message is the same as the one in the routing table”
//βš† verify (read back) that BEP 44 is stored and strongly prefer (retaining) such nodes
//βš† store BEP 44 on different IPs and not just IDs
//βš† keep track of reply speed (and availability)
//βš† make read-only β€œro: 1” optional

//βš† implement peer lookup

// port 1? https://github.com/transmission/transmission/issues/527

// libtorrent will not attempt to make outgoing connections to peers whose port is < 1024. This is a safety precaution to avoid being part of a DDoS attack

#![allow (dead_code)]
#![allow (non_camel_case_types)]

use bt_bencode::Value as Ben;
use crate::{binprint, now_ms};
use crate::re::Re;
use fomat_macros::{fomat, pintln};
use rand::{Rng, RngCore, SeedableRng, rngs::SmallRng};
use serde_bytes::ByteBuf;
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::mem::MaybeUninit;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, ToSocketAddrs, UdpSocket};
use std::thread;
use std::time::Duration;

#[derive (Serialize, Debug, Deserialize)]
struct Args {
  id: ByteBuf}

#[derive (Serialize, Debug, Deserialize)]
struct ArgNodes {
  id: ByteBuf,
  nodes: ByteBuf}

#[derive (Serialize, Debug, Deserialize)]
struct ArgTarget {
  id: ByteBuf,
  target: ByteBuf}

#[derive (Serialize, Debug, Deserialize)]
#[serde (untagged)]
enum ArgsE {
  ArgNodes (ArgNodes),
  ArgTarget (ArgTarget),
  Args (Args)}

#[derive (Serialize, Debug)]
struct Msg<'a> {
  /// "q" for query, "r" for response, "e" for error
  #[serde (borrow)]
  y: &'a str,

  /// transaction ID should be encoded as a short string of binary numbers,
  /// typically 2 characters are enough as they cover 2^16 outstanding queries
  #[serde (borrow)]
  t: &'a str,

  /// "ping", "find_node", "get_peers", "announce_peer"
  #[serde (borrow)]
  q: &'a str,

  #[serde (skip_serializing_if = "Option::is_none")]
  ro: Option<i8>,

  a: ArgsE}

#[derive (Debug, Deserialize)]
struct Rep<'a> {
  r: ArgsE,
  #[serde (borrow)]
  t: &'a str,
  #[serde (borrow)]
  v: Option<&'a [u8]>,
  #[serde (borrow)]
  y: &'a str,
  /// Our IP and port, a kind of STUN for hole punching (BEP 24)
  #[serde (borrow)]
  ip: Option<&'a [u8]>}

#[derive (Serialize, Debug, Deserialize)]
pub struct MainlineSt {
  node_id: [u8; 20],
  qwe: bool}
impl MainlineSt {
  pub fn new() -> MainlineSt {
    let mut rng = SmallRng::seed_from_u64 (now_ms());
    let node_id: [u8; 20] = rng.gen();
    MainlineSt {
      node_id,
      qwe: false}}}

pub struct Ip {
  // https://stackoverflow.com/questions/7067732/what-is-the-maximum-size-of-the-udp-packet-which-is-sent-by-the-mainline-dht-nod
  pub packet: [u8; 1438],
  pub len: usize,
  pub from: SocketAddr}

pub struct Op {
  pub packet: SmallVec::<[u8; 1438]>,
  pub to: SocketAddr}

pub fn mainline (_now: u64, st: &mut MainlineSt, ips: &mut Vec<Ip>, ops: &mut Vec<Op>) -> Re<()> {
  //βŒ₯ ping the seeds in order to get their IDs, in order to add them into the routing table,
  // BUT, mark them with a β€œseed” flag, in order to avoid calling them in the future,
  // given that interacting with the known BitTorrent seeds can negatively affect the connection;
  // then again, we should likely avoid the β€œ6881” port, automatically including the seeds
  // 6881 ..= 6889 per https://www.speedguide.net/port.php?port=6881

  /*
  let msg = Msg {
    y: "q",
    t: "123",
    q: "ping",
    ro: Some (1),
    a: ArgsE::Args (Args {
      id: ByteBuf::from (Vec::from (&st.node_id[..]))})};

  let msg = bt_bencode::to_vec (&msg)?;
  //pintln! ((binprint (&msg, b'.')));
  */

  if !st.qwe {
    st.qwe = true;

    let msg = Msg {
      y: "q",
      t: "123",
      q: "find_node",
      ro: Some (1),
      a: ArgsE::ArgTarget (ArgTarget {
        id: ByteBuf::from (Vec::from (&st.node_id[..])),
        //βš† ask for a random ID instead;
        // or rather semi-random: we want to have a good map of the routing space around us,
        // in order to reduce the number of peer lookup roundtrips when using grouped node IDs;
        // hence keep the prefix, but randomize a small suffix
        target: ByteBuf::from (Vec::from (&st.node_id[..]))})};

    //βš† helpers for a command-line tool that would dump the routing table to be embedded with `include_bytes!`,
    // bootstrap nodes included, in order not to depend on the DNS resolution
    //βš† delegating routing table persistance to a call site should auto-magically help
    //βš† should generate a new `node_id` when reusing a state with `include_bytes!`; consider
    // generating a new `node_id` whenever the *hostname* changes
    //βš† on the other hand, we should support a call site providing a custom `node_id`,
    // in order to group known peers at a given corner of the routing space,
    // reducing the roundtrips required for peer lookup

    //βš† name resolution should be a separate step, to avoid duplicate seed requests
    //βš† consider tracking seed domain name in the routing table and only resorting to DNS if existing
    // records aren't giving a reply (a last resort)

    let mut packet = SmallVec::<[u8; 1438]>::new();
    bt_bencode::to_writer (&mut packet, &msg)?;
    pintln! ((binprint (&packet, b'.')));
    let seeds = [
      // https://blog.libtorrent.org/2016/09/dht-bootstrap-node/
      ("dht.libtorrent.org", 25401),
        ("185.157.221.247", 25401),
      ("router.bittorrent.com", 6881),
        ("67.215.246.10", 6881),
      ("dht.transmissionbt.com", 6881)];
    for seed in seeds {
      match seed.to_socket_addrs() {
        Ok (addrs) => for to in addrs {
          pintln! ([=to]);
          if !to.is_ipv4() {continue}
          ops.push (Op {packet: packet.clone(), to})},
        Err (err) => {pintln! ([=seed] ": " (err))}}}
  }

  for ip in ips.drain (..) {
    let packet = &ip.packet[..ip.len];
    pintln! ([=ip.from] ' ' (binprint (packet, b'.')));
    if let Ok (ben) = bt_bencode::from_slice::<Ben> (packet) {pintln! ([ben])}
    if let Ok (re1) = bt_bencode::from_slice::<Rep> (packet) {
      pintln! ([=re1]);
      if let ArgsE::ArgNodes (ref r) = re1.r {
        for ofs in (0 .. r.nodes.len()) .step_by (26) {
          let node = &r.nodes[ofs .. ofs+26];
          let _id = &node[0..20];
          let ip = Ipv4Addr::new (node[20], node[21], node[22], node[23]);
          let port = u16::from_be_bytes ([node[24], node[25]]);
          pintln! ([=ip] ' ' [=port]);
        }
      }
    }
  }

  Re::Ok(())}

#[cfg(all(test, feature = "nightly"))] mod test {
  extern crate test;
  use crate::{binprint, now_ms};
  use crate::re::Re;
  use fomat_macros::pintln;
  use smallvec::SmallVec;
  use std::mem::MaybeUninit;
  use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket};
  use std::thread;
  use std::time::Duration;
  use super::{Ip, MainlineSt, Op};

  #[bench] fn mainline (bm: &mut test::Bencher) {
    let addr = SocketAddr::from (([0, 0, 0, 0], 1234));
    let mut udp = UdpSocket::bind (&addr) .expect ("!bind");
    udp.set_nonblocking (true) .expect ("!nonblocking");
    let mut st = MainlineSt::new();
    let mut ips: Vec<Ip> = Vec::new();
    let mut ops: Vec<Op> = Vec::new();

    fn step (udp: &mut UdpSocket, st: &mut MainlineSt, ips: &mut Vec<Ip>, ops: &mut Vec<Op>) -> Re<()> {
      let mut packet: [u8; 1438] = unsafe {MaybeUninit::uninit().assume_init()};
      if let Ok ((len, from)) = udp.recv_from (&mut packet) {
        ips.push (Ip {packet, len, from})}
      super::mainline (now_ms(), st, ips, ops)?;
      for op in ops.drain (..) {
        udp.send_to (&op.packet, op.to)?;}
      Re::Ok(())}

    bm.iter (|| step (&mut udp, &mut st, &mut ips, &mut ops) .expect ("!step"));
    for _ in 0 .. 12 {
      step (&mut udp, &mut st, &mut ips, &mut ops) .expect ("!step");
      thread::sleep (Duration::from_secs_f32 (0.31))}}}