netwatch-sdk 0.4.0

Shared wire-format types and collectors for NetWatch Cloud — the SDK consumed by netwatch-agent and the NetWatch Cloud server. Parses /proc, ss, lsof, nettop, and libpcap events into a common Snapshot payload.
Documentation
//! Decoded events delivered to userspace consumers.
//!
//! These types are the public face of the eBPF source. They differ from
//! the on-the-wire `netwatch_sdk_common::*Event` structs in two important
//! ways:
//!
//! 1. **Address fields are `IpAddr` in host byte order.** The kernel
//!    writes raw network-byte-order words (matching `struct sock`); the
//!    userspace decoder converts before pushing onto the channel.
//!    Consumers should not have to think about endianness — or about
//!    which address family the kprobe fired for: v4-mapped IPv6
//!    destinations (`::ffff:a.b.c.d`, i.e. IPv4 traffic over a dual-stack
//!    socket) are canonicalised back to `IpAddr::V4` so cache keys match
//!    what `/proc`/lsof report for the same connection.
//! 2. **`comm` is a `String`.** The kernel writes a 16-byte NUL-padded
//!    array; userspace trims and validates UTF-8 once.
//!
//! The on-the-wire types live in `crate::wire` (see that module for the
//! contract that the BPF-side copy in `crates/ebpf-programs` mirrors).

use chrono::{DateTime, TimeZone, Utc};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

/// One event from the eBPF source. Marked `non_exhaustive`: new variants
/// will land in subsequent roadmap phases (`AcceptEvent`, `CloseEvent`,
/// `RetransmitEvent`, …) — match with a wildcard arm.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum EbpfEvent {
    /// A TCP (`tcp_v{4,6}_connect`) or connected-UDP
    /// (`ip{4,6}_datagram_connect`) connect fired in the kernel.
    /// Discriminate on `ConnectEvent::daddr` for family and
    /// `ConnectEvent::protocol` for transport.
    Connect(ConnectEvent),
}

/// Transport protocol the connect event came from. UDP events come from
/// the datagram-connect kprobes (a *connected* UDP socket, the QUIC client
/// pattern); unconnected `sendto`/`sendmsg` UDP is not yet attributed.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Protocol {
    Tcp,
    Udp,
}

/// A successful connect attempt from a local TCP or connected-UDP socket.
///
/// This corresponds to one entry in the kernel's `tcp_v{4,6}_connect` or
/// `ip{4,6}_datagram_connect` kprobe. The event fires *before* the first
/// packet is sent; whether a TCP connection is completed is observable via
/// a future `inet_sock_set_state` event.
#[derive(Debug, Clone)]
pub struct ConnectEvent {
    /// Transport protocol — TCP or (connected) UDP.
    pub protocol: Protocol,
    /// Process group id of the calling task.
    pub tgid: u32,
    /// Thread id of the calling task.
    pub pid: u32,
    /// Process command (16-char `task_struct->comm`, NULs trimmed).
    pub comm: String,
    /// Source address in host order. Unspecified (`0.0.0.0` / `::`) when
    /// the socket has not been bound at the time the kprobe fires —
    /// always the case in Phases 1–2, which probe connect-entry.
    pub saddr: IpAddr,
    /// Destination address in host order. v4-mapped IPv6 destinations
    /// are canonicalised to `IpAddr::V4`.
    pub daddr: IpAddr,
    /// Source port in host order. **0 in Phases 1–2** — the source port is
    /// stored at an offset that requires CO-RE, which lands in a later
    /// iteration. See `crates/ebpf-programs/src/main.rs`.
    pub sport: u16,
    /// Destination port in host order.
    pub dport: u16,
    /// Capture timestamp (kernel boot-time clock, converted to wall-clock
    /// `DateTime<Utc>` by the userspace reader using the boot time).
    pub timestamp: DateTime<Utc>,
}

// Decoders are called from the ring-buffer reader, which only exists on
// Linux — allow dead_code so non-Linux builds with `--features ebpf`
// stay warning-free (same treatment as `estimate_boot_time`).
#[allow(dead_code)]
impl ConnectEvent {
    /// Decode a `crate::wire::ConnectV4Event` from the BPF ring buffer
    /// into the public, host-byte-order representation.
    ///
    /// `boot_time` is the wall-clock instant when the kernel booted; we
    /// add the per-event `bpf_ktime_get_ns` to it to produce a usable
    /// timestamp.
    pub(crate) fn decode_v4(
        raw: &crate::wire::ConnectV4Event,
        boot_time: DateTime<Utc>,
        protocol: Protocol,
    ) -> Self {
        // Address and port fields are stored in memory in network byte
        // order. Reading them back via `to_ne_bytes()` gives us those raw
        // bytes regardless of host endianness; we then re-interpret as
        // (a) an Ipv4Addr — which takes its bytes in IP-octet order — or
        // (b) a u16 from BE bytes for the port.
        let saddr = IpAddr::V4(Ipv4Addr::from(raw.saddr.to_ne_bytes()));
        let daddr = IpAddr::V4(Ipv4Addr::from(raw.daddr.to_ne_bytes()));
        let sport = u16::from_be_bytes(raw.sport.to_ne_bytes());
        let dport = u16::from_be_bytes(raw.dport.to_ne_bytes());

        let timestamp = boot_time + chrono::Duration::nanoseconds(raw.timestamp_ns as i64);

        Self {
            protocol,
            tgid: raw.tgid,
            pid: raw.pid,
            comm: decode_comm(&raw.comm),
            saddr,
            daddr,
            sport,
            dport,
            timestamp,
        }
    }

    /// Decode a `crate::wire::ConnectV6Event` (Phase 2). Address bytes are
    /// already in IP-octet order in the wire struct, so they map straight
    /// into `Ipv6Addr` — then v4-mapped addresses (`::ffff:a.b.c.d`,
    /// produced by dual-stack sockets connecting to IPv4 peers through
    /// `tcp_v6_connect`) are canonicalised to `IpAddr::V4` so consumers
    /// see the same address family that `/proc`/lsof report.
    pub(crate) fn decode_v6(
        raw: &crate::wire::ConnectV6Event,
        boot_time: DateTime<Utc>,
        protocol: Protocol,
    ) -> Self {
        let saddr = Ipv6Addr::from(raw.saddr).to_canonical();
        let daddr = Ipv6Addr::from(raw.daddr).to_canonical();
        let sport = u16::from_be_bytes(raw.sport.to_ne_bytes());
        let dport = u16::from_be_bytes(raw.dport.to_ne_bytes());

        let timestamp = boot_time + chrono::Duration::nanoseconds(raw.timestamp_ns as i64);

        Self {
            protocol,
            tgid: raw.tgid,
            pid: raw.pid,
            comm: decode_comm(&raw.comm),
            saddr,
            daddr,
            sport,
            dport,
            timestamp,
        }
    }
}

/// Trim a NUL-padded `task_struct->comm` into a `String`.
#[allow(dead_code)]
fn decode_comm(comm: &[u8]) -> String {
    let end = comm.iter().position(|&b| b == 0).unwrap_or(comm.len());
    String::from_utf8_lossy(&comm[..end]).into_owned()
}

/// Read kernel boot time as a wall-clock `DateTime<Utc>` so we can convert
/// `bpf_ktime_get_ns()` (boot-time monotonic) into wall-clock timestamps.
///
/// On Linux this would normally read `/proc/uptime` and subtract from now.
/// Cross-platform fallback: assume "now" is the boot time. That biases
/// every event by a small amount but keeps the API working on macOS where
/// the BPF source itself is unavailable.
#[allow(dead_code)]
pub(crate) fn estimate_boot_time() -> DateTime<Utc> {
    #[cfg(target_os = "linux")]
    {
        if let Ok(uptime_str) = std::fs::read_to_string("/proc/uptime") {
            if let Some(secs_str) = uptime_str.split_whitespace().next() {
                if let Ok(secs) = secs_str.parse::<f64>() {
                    let now = Utc::now();
                    let boot_secs = secs as i64;
                    let boot_nanos = ((secs.fract()) * 1e9) as i64;
                    return now
                        - chrono::Duration::seconds(boot_secs)
                        - chrono::Duration::nanoseconds(boot_nanos);
                }
            }
        }
    }
    Utc.timestamp_opt(0, 0).single().unwrap_or_else(Utc::now)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::wire::{ConnectV4Event, ConnectV6Event, EventKind, COMM_LEN};

    #[test]
    fn decode_v4_converts_addresses_and_ports_to_host_order() {
        let mut comm = [0u8; COMM_LEN];
        comm[..4].copy_from_slice(b"curl");
        let raw = ConnectV4Event {
            kind: EventKind::TcpV4Connect,
            _pad0: [0; 3],
            tgid: 1234,
            pid: 1235,
            // The kernel writes IP bytes in network order to the u32's
            // memory location. `from_ne_bytes` reproduces that layout
            // regardless of host endianness, so the test is portable.
            saddr: u32::from_ne_bytes([192, 168, 1, 10]),
            daddr: u32::from_ne_bytes([1, 1, 1, 1]),
            sport: 0,
            // Port 443 in network byte order is the bytes [0x01, 0xBB].
            dport: u16::from_ne_bytes([0x01, 0xBB]),
            comm,
            timestamp_ns: 1_000_000_000,
        };
        let boot = Utc.timestamp_opt(1_700_000_000, 0).unwrap();
        let ev = ConnectEvent::decode_v4(&raw, boot, Protocol::Tcp);

        assert_eq!(ev.protocol, Protocol::Tcp);
        assert_eq!(ev.pid, 1235);
        assert_eq!(ev.tgid, 1234);
        assert_eq!(ev.comm, "curl");
        assert_eq!(ev.saddr, IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)));
        assert_eq!(ev.daddr, IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)));
        assert_eq!(ev.dport, 443);
        // boot + 1s
        assert_eq!(ev.timestamp.timestamp(), 1_700_000_001);
    }

    #[test]
    fn decode_handles_short_comm_without_panicking() {
        let raw = ConnectV4Event::empty();
        let ev = ConnectEvent::decode_v4(&raw, Utc::now(), Protocol::Tcp);
        assert_eq!(ev.comm, "");
    }

    #[test]
    fn decode_tags_udp_protocol_from_the_datagram_connect_kinds() {
        // The UDP datagram-connect kprobes reuse the ConnectV{4,6}Event
        // layout; only the protocol passed by the dispatcher differs.
        let v4 = ConnectEvent::decode_v4(&ConnectV4Event::empty(), Utc::now(), Protocol::Udp);
        assert_eq!(v4.protocol, Protocol::Udp);
        let v6 = ConnectEvent::decode_v6(&ConnectV6Event::empty(), Utc::now(), Protocol::Udp);
        assert_eq!(v6.protocol, Protocol::Udp);
    }

    #[test]
    fn decode_v6_converts_address_and_port() {
        let mut comm = [0u8; COMM_LEN];
        comm[..4].copy_from_slice(b"curl");
        let mut raw = ConnectV6Event::empty();
        raw.tgid = 42;
        raw.pid = 43;
        raw.comm = comm;
        // 2606:4700::6810:85e5 (cloudflare), already in IP-octet order.
        raw.daddr = [
            0x26, 0x06, 0x47, 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0x68, 0x10, 0x85, 0xe5,
        ];
        raw.dport = u16::from_ne_bytes([0x01, 0xBB]); // 443, network order
        raw.timestamp_ns = 2_000_000_000;

        let boot = Utc.timestamp_opt(1_700_000_000, 0).unwrap();
        let ev = ConnectEvent::decode_v6(&raw, boot, Protocol::Tcp);

        assert_eq!(ev.pid, 43);
        assert_eq!(ev.comm, "curl");
        assert_eq!(
            ev.daddr,
            "2606:4700::6810:85e5".parse::<IpAddr>().unwrap()
        );
        assert_eq!(ev.dport, 443);
        // The unbound source decodes as unspecified `::`.
        assert!(ev.saddr.is_unspecified());
        assert_eq!(ev.timestamp.timestamp(), 1_700_000_002);
    }

    #[test]
    fn decode_v6_canonicalises_v4_mapped_destinations() {
        let mut raw = ConnectV6Event::empty();
        // ::ffff:93.184.216.34 — IPv4 peer reached through a dual-stack
        // socket; tcp_v6_connect sees the v4-mapped form.
        raw.daddr = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 93, 184, 216, 34];
        raw.dport = u16::from_ne_bytes([0x00, 0x50]); // 80

        let ev = ConnectEvent::decode_v6(&raw, Utc.timestamp_opt(0, 0).unwrap(), Protocol::Tcp);

        assert_eq!(ev.daddr, IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)));
        assert_eq!(ev.dport, 80);
    }
}