peerman 0.1.8

DN42 peer manager with WireGuard, BIRD, and cluster support
use std::collections::HashMap;

use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use crate::error::AppError;
use crate::models::flap_event::{FlapEvent, FlapEventRepository};

const DEFAULT_WINDOW_SECS: i64 = 60;
const DEFAULT_CHANGE_THRESHOLD: u32 = 10;
const DEFAULT_RESOLVE_AFTER_SECS: i64 = 300;
const CHECK_INTERVAL_SECS: u64 = 30;
const TRACKER_TTL_SECS: i64 = 600;

struct PrefixTracker {
    prefix: String,
    prefix_type: String,
    changes: Vec<i64>,
    last_path_hash: u64,
    last_change_ts: i64,
}

pub struct FlapDetector {
    window_secs: i64,
    change_threshold: u32,
    resolve_after_secs: i64,
    node_id: String,
    repo: FlapEventRepository,
    trackers: HashMap<String, PrefixTracker>,
}

impl FlapDetector {
    pub fn new(node_id: String, repo: FlapEventRepository) -> Self {
        Self {
            window_secs: DEFAULT_WINDOW_SECS,
            change_threshold: DEFAULT_CHANGE_THRESHOLD,
            resolve_after_secs: DEFAULT_RESOLVE_AFTER_SECS,
            node_id,
            repo,
            trackers: HashMap::new(),
        }
    }

    pub async fn run(
        &mut self,
        mut rx: mpsc::Receiver<super::bgp_listener::PathChange>,
        token: CancellationToken,
    ) {
        loop {
            let tick = tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS));
            tokio::select! {
                _ = token.cancelled() => {
                    tracing::info!("Flap detector shutting down");
                    return;
                }
                maybe_change = rx.recv() => {
                    match maybe_change {
                        Some(change) => {
                            if let Err(e) = self.process_change(change).await {
                                tracing::warn!("Flap detector error: {e}");
                            }
                        }
                        None => {
                            tracing::info!("BGP channel closed, flap detector exiting");
                            return;
                        }
                    }
                }
                _ = tick => {
                    if let Err(e) = self.resolve_stale().await {
                        tracing::warn!("Flap detector resolve error: {e}");
                    }
                    self.evict_trackers();
                }
            }
        }
    }

    fn evict_trackers(&mut self) {
        let now = chrono::Utc::now().timestamp();
        self.trackers
            .retain(|_, t| now - t.last_change_ts <= TRACKER_TTL_SECS);
    }

    async fn process_change(
        &mut self,
        change: super::bgp_listener::PathChange,
    ) -> Result<(), AppError> {
        let prefix_key = format!("{}:{}", change.prefix, change.node_id);
        let now = chrono::Utc::now().timestamp();

        struct Action {
            prefix: String,
            prefix_type: String,
            change_count: u32,
        }

        let action = if let Some(tracker) = self.trackers.get_mut(&prefix_key) {
            if tracker.last_path_hash == change.path_hash {
                return Ok(());
            }
            tracker.last_path_hash = change.path_hash;
            tracker.last_change_ts = now;
            tracker.changes.push(now);
            tracker.changes.retain(|t| now - t <= self.window_secs);

            let count = tracker.changes.len() as u32;
            if count >= self.change_threshold {
                Some(Action {
                    prefix: tracker.prefix.clone(),
                    prefix_type: tracker.prefix_type.clone(),
                    change_count: count,
                })
            } else {
                None
            }
        } else {
            let prefix_type = if change.prefix.contains(':') {
                "ipv6"
            } else {
                "ipv4"
            };
            self.trackers.insert(
                prefix_key,
                PrefixTracker {
                    prefix: change.prefix.clone(),
                    prefix_type: prefix_type.to_string(),
                    changes: vec![now],
                    last_path_hash: change.path_hash,
                    last_change_ts: now,
                },
            );
            None
        };

        if let Some(action) = action {
            let window_start =
                (chrono::Utc::now() - chrono::Duration::seconds(self.window_secs)).to_rfc3339();
            let window_end = chrono::Utc::now().to_rfc3339();

            if let Some(existing) = self
                .repo
                .find_active_by_prefix_node(&action.prefix, &self.node_id)
                .await?
            {
                self.repo
                    .update_change_count(&existing.id, action.change_count as i32, &window_end)
                    .await?;
            } else {
                let event = FlapEvent {
                    id: Uuid::new_v4().to_string(),
                    prefix: action.prefix.clone(),
                    prefix_type: action.prefix_type,
                    node_id: self.node_id.clone(),
                    change_count: action.change_count as i32,
                    window_start,
                    window_end,
                    source: "ibgp".to_string(),
                    active: true,
                    detected_at: chrono::Utc::now().to_rfc3339(),
                    resolved_at: None,
                };
                self.repo.create(&event).await?;
                tracing::warn!(
                    "Flap detected: {} ({} changes in {}s)",
                    event.prefix,
                    event.change_count,
                    self.window_secs,
                );
            }
        }

        Ok(())
    }

    async fn resolve_stale(&mut self) -> Result<(), AppError> {
        let now = chrono::Utc::now().timestamp();
        let active = self.repo.list_active().await?;

        for event in active {
            let key = format!("{}:{}", event.prefix, event.node_id);
            let should_resolve = if let Some(tracker) = self.trackers.get(&key) {
                let latest = tracker.changes.last().copied().unwrap_or(0);
                now - latest > self.resolve_after_secs
            } else {
                let detected = chrono::DateTime::parse_from_rfc3339(&event.detected_at)
                    .map(|d| d.timestamp())
                    .unwrap_or(0);
                now - detected > self.resolve_after_secs
            };

            if should_resolve {
                self.repo.resolve(&event.id).await?;
                tracing::info!("Flap resolved: {} (id={})", event.prefix, event.id);
            }
        }

        Ok(())
    }
}