orb8-agent 0.0.4

Node agent for orb8 (DaemonSet)
//! Pod metadata cache for correlating cgroup IDs to Kubernetes pods
//!
//! This module maintains a concurrent map from cgroup IDs to pod metadata,
//! allowing the agent to enrich eBPF events with Kubernetes context.

use dashmap::DashMap;
use std::sync::Arc;

/// Metadata about a Kubernetes pod container
#[derive(Debug, Clone)]
pub struct PodMetadata {
    pub namespace: String,
    pub pod_name: String,
    pub pod_uid: String,
    pub container_name: String,
    pub container_id: String,
    pub pod_ip: Option<u32>,
}

/// Thread-safe cache mapping cgroup IDs and IPs to pod metadata
#[derive(Clone)]
pub struct PodCache {
    by_cgroup: Arc<DashMap<u64, PodMetadata>>,
    by_ip: Arc<DashMap<u32, PodMetadata>>,
}

impl PodCache {
    /// Create a new empty pod cache
    pub fn new() -> Self {
        Self {
            by_cgroup: Arc::new(DashMap::new()),
            by_ip: Arc::new(DashMap::new()),
        }
    }

    /// Insert or update a mapping from cgroup ID to pod metadata
    pub fn insert(&self, cgroup_id: u64, metadata: PodMetadata) {
        if let Some(ip) = metadata.pod_ip {
            self.by_ip.insert(ip, metadata.clone());
        }
        self.by_cgroup.insert(cgroup_id, metadata);
    }

    /// Insert pod metadata with IP-based lookup (preferred for IP-based enrichment)
    pub fn insert_by_ip(&self, metadata: PodMetadata) {
        if let Some(ip) = metadata.pod_ip {
            self.by_ip.insert(ip, metadata);
        }
    }

    /// Look up pod metadata by cgroup ID
    pub fn get(&self, cgroup_id: u64) -> Option<PodMetadata> {
        self.by_cgroup.get(&cgroup_id).map(|r| r.clone())
    }

    /// Look up pod metadata by IP address
    pub fn get_by_ip(&self, ip: u32) -> Option<PodMetadata> {
        self.by_ip.get(&ip).map(|r| r.clone())
    }

    /// Remove a cgroup ID mapping
    pub fn remove(&self, cgroup_id: u64) -> Option<PodMetadata> {
        self.by_cgroup.remove(&cgroup_id).map(|(_, v)| v)
    }

    /// Remove all entries matching a pod UID
    pub fn remove_pod(&self, pod_uid: &str) {
        self.by_cgroup.retain(|_, v| v.pod_uid != pod_uid);
        self.by_ip.retain(|_, v| v.pod_uid != pod_uid);
    }

    /// Get the number of entries in the cache (by cgroup)
    pub fn len(&self) -> usize {
        self.by_cgroup.len()
    }

    /// Get the number of IP-based entries
    pub fn ip_entries_count(&self) -> usize {
        self.by_ip.len()
    }

    /// Check if the cache is empty
    pub fn is_empty(&self) -> bool {
        self.by_cgroup.is_empty()
    }

    /// Get all entries (for debugging/metrics)
    pub fn entries(&self) -> Vec<(u64, PodMetadata)> {
        self.by_cgroup
            .iter()
            .map(|r| (*r.key(), r.value().clone()))
            .collect()
    }
}

impl Default for PodCache {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_pod_cache_insert_get() {
        let cache = PodCache::new();

        let metadata = PodMetadata {
            namespace: "default".to_string(),
            pod_name: "nginx".to_string(),
            pod_uid: "abc-123".to_string(),
            container_name: "nginx".to_string(),
            container_id: "container123".to_string(),
            pod_ip: Some(0x0A000005), // 10.0.0.5
        };

        cache.insert(12345, metadata.clone());

        let retrieved = cache.get(12345).expect("Should find entry");
        assert_eq!(retrieved.namespace, "default");
        assert_eq!(retrieved.pod_name, "nginx");
    }

    #[test]
    fn test_pod_cache_get_by_ip() {
        let cache = PodCache::new();

        let metadata = PodMetadata {
            namespace: "default".to_string(),
            pod_name: "nginx".to_string(),
            pod_uid: "abc-123".to_string(),
            container_name: "nginx".to_string(),
            container_id: "container123".to_string(),
            pod_ip: Some(0x0A000005), // 10.0.0.5
        };

        cache.insert_by_ip(metadata);

        let retrieved = cache.get_by_ip(0x0A000005).expect("Should find by IP");
        assert_eq!(retrieved.namespace, "default");
        assert_eq!(retrieved.pod_name, "nginx");

        assert!(cache.get_by_ip(0x0A000099).is_none());
    }

    #[test]
    fn test_pod_cache_remove_pod() {
        let cache = PodCache::new();

        let metadata1 = PodMetadata {
            namespace: "default".to_string(),
            pod_name: "nginx".to_string(),
            pod_uid: "pod-1".to_string(),
            container_name: "nginx".to_string(),
            container_id: "c1".to_string(),
            pod_ip: Some(0x0A000001),
        };

        let metadata2 = PodMetadata {
            namespace: "default".to_string(),
            pod_name: "nginx".to_string(),
            pod_uid: "pod-1".to_string(),
            container_name: "sidecar".to_string(),
            container_id: "c2".to_string(),
            pod_ip: Some(0x0A000001), // Same IP (same pod)
        };

        let metadata3 = PodMetadata {
            namespace: "other".to_string(),
            pod_name: "redis".to_string(),
            pod_uid: "pod-2".to_string(),
            container_name: "redis".to_string(),
            container_id: "c3".to_string(),
            pod_ip: Some(0x0A000002),
        };

        cache.insert(1, metadata1);
        cache.insert(2, metadata2);
        cache.insert(3, metadata3);

        assert_eq!(cache.len(), 3);

        cache.remove_pod("pod-1");

        assert_eq!(cache.len(), 1);
        assert!(cache.get(1).is_none());
        assert!(cache.get(2).is_none());
        assert!(cache.get(3).is_some());
        assert!(cache.get_by_ip(0x0A000001).is_none());
        assert!(cache.get_by_ip(0x0A000002).is_some());
    }
}