focl 0.1.0

focl/focld - lightweight Rust BGP speaker
Documentation
pub mod layout;
pub mod manifest;
pub mod queue;
pub mod replicator;
pub mod snapshot;
pub mod types;
pub mod writer;

use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{Context, Result};
use chrono::Utc;
use tokio::sync::{broadcast, Mutex};

use crate::archive::layout::{aligned_epoch, segment_paths};
use crate::archive::replicator::Replicator;
use crate::archive::snapshot::{
    build_table_dump_v2, encode_bgp4mp_message_as4, encode_bgp4mp_state_change_as4,
};
use crate::archive::types::{
    ArchiveStatus, ArchiveStream, FinalizedSegment, PeerStateRecordInput, RibSnapshotInput,
    UpdateRecordInput,
};
use crate::archive::writer::SegmentWriter;
use crate::config::{ArchiveConfig, DestinationMode};
use crate::types::{Event, EventEnvelope};

pub struct ArchiveService {
    cfg: ArchiveConfig,
    collector_bgp_id: Ipv4Addr,
    updates_writer: Mutex<Option<SegmentWriter>>,
    ribs_last: Mutex<Option<FinalizedSegment>>,
    last_rib_bucket: Mutex<Option<i64>>,
    replicator: Option<Arc<Replicator>>,
    event_tx: broadcast::Sender<EventEnvelope>,
}

impl ArchiveService {
    pub async fn new(cfg: ArchiveConfig, collector_bgp_id: Ipv4Addr) -> Result<Arc<Self>> {
        let (event_tx, _event_rx) = broadcast::channel(512);

        let replicator = if cfg.enabled {
            std::fs::create_dir_all(&cfg.root)
                .with_context(|| format!("failed creating archive root {}", cfg.root.display()))?;
            std::fs::create_dir_all(&cfg.tmp_root).with_context(|| {
                format!(
                    "failed creating archive tmp root {}",
                    cfg.tmp_root.display()
                )
            })?;
            cleanup_tmp_root(&cfg.tmp_root)
                .with_context(|| format!("failed cleaning tmp root {}", cfg.tmp_root.display()))?;

            let queue = crate::archive::queue::ReplicationQueue::new(&cfg.root)?;
            Some(Arc::new(Replicator::new(
                &cfg,
                queue,
                Some(event_tx.clone()),
            )))
        } else {
            None
        };

        let service = Arc::new(Self {
            cfg,
            collector_bgp_id,
            updates_writer: Mutex::new(None),
            ribs_last: Mutex::new(None),
            last_rib_bucket: Mutex::new(None),
            replicator,
            event_tx,
        });

        if service.cfg.enabled {
            service
                .ensure_updates_writer(Utc::now().timestamp())
                .await?;
            service.spawn_background_tasks();
        }

        Ok(service)
    }

    pub fn subscribe_events(&self) -> broadcast::Receiver<EventEnvelope> {
        self.event_tx.subscribe()
    }

    pub fn event_sender(&self) -> broadcast::Sender<EventEnvelope> {
        self.event_tx.clone()
    }

    pub fn destinations(&self) -> Vec<(String, String, String)> {
        self.cfg
            .destinations
            .iter()
            .map(|d| {
                let dtype = match d.destination_type {
                    crate::config::DestinationType::Local => "local",
                    crate::config::DestinationType::S3 => "s3",
                }
                .to_string();
                let mode = match d.mode {
                    DestinationMode::Primary => "primary",
                    DestinationMode::AsyncReplica => "async_replica",
                }
                .to_string();
                (d.destination_key(), mode, dtype)
            })
            .collect()
    }

    pub async fn ingest_update(&self, update: UpdateRecordInput) -> Result<()> {
        if !self.cfg.enabled {
            return Ok(());
        }

        self.ensure_updates_writer(update.timestamp).await?;

        let record = encode_bgp4mp_message_as4(&update)?;
        let mut writer_guard = self.updates_writer.lock().await;
        let writer = writer_guard
            .as_mut()
            .context("updates writer not initialized")?;
        writer.write_record(&record)?;

        Ok(())
    }

    pub async fn ingest_peer_state(&self, state: PeerStateRecordInput) -> Result<()> {
        if !self.cfg.enabled || !self.cfg.include_peer_state_records {
            return Ok(());
        }

        self.ensure_updates_writer(state.timestamp).await?;

        let record = encode_bgp4mp_state_change_as4(&state)?;
        let mut writer_guard = self.updates_writer.lock().await;
        let writer = writer_guard
            .as_mut()
            .context("updates writer not initialized")?;
        writer.write_record(&record)?;

        Ok(())
    }

    pub async fn snapshot_now(&self, mut input: RibSnapshotInput) -> Result<FinalizedSegment> {
        if !self.cfg.enabled {
            anyhow::bail!("archive is disabled");
        }

        if input.collector_bgp_id == Ipv4Addr::UNSPECIFIED {
            input.collector_bgp_id = self.collector_bgp_id;
        }

        let paths = segment_paths(&self.cfg, ArchiveStream::Ribs, input.timestamp)?;
        self.emit(Event::ArchiveSegmentOpened {
            stream: ArchiveStream::Ribs.as_str().to_string(),
            path: paths.final_path.display().to_string(),
            start_ts: aligned_epoch(input.timestamp, self.cfg.ribs_interval_secs),
        });

        let mut writer = SegmentWriter::new(
            &self.cfg,
            ArchiveStream::Ribs,
            aligned_epoch(input.timestamp, self.cfg.ribs_interval_secs),
            paths,
        )?;

        let records = build_table_dump_v2(&input)?;
        for rec in records {
            writer.write_record(&rec)?;
        }

        let finalized = writer.finalize(input.timestamp)?;
        self.emit(Event::ArchiveSegmentFinalized {
            stream: ArchiveStream::Ribs.as_str().to_string(),
            path: finalized.final_path.display().to_string(),
            end_ts: finalized.end_ts,
            records: finalized.record_count,
        });

        if let Some(replicator) = &self.replicator {
            replicator.enqueue_segment(&finalized)?;
        }

        {
            let mut last = self.ribs_last.lock().await;
            *last = Some(finalized.clone());
        }

        Ok(finalized)
    }

    pub async fn rollover(&self, stream: ArchiveStream) -> Result<()> {
        if !self.cfg.enabled {
            return Ok(());
        }

        match stream {
            ArchiveStream::Updates => {
                self.rotate_updates(Utc::now().timestamp()).await?;
            }
            ArchiveStream::Ribs => {
                let now = Utc::now().timestamp();
                let snapshot = RibSnapshotInput {
                    timestamp: now,
                    collector_bgp_id: self.collector_bgp_id,
                    view_name: "main".to_string(),
                    peers: vec![],
                    routes: vec![],
                };
                self.snapshot_now(snapshot).await?;
            }
        }

        Ok(())
    }

    pub async fn retry_failed_replications(&self) -> Result<usize> {
        match &self.replicator {
            Some(rep) => rep.retry_failed(),
            None => Ok(0),
        }
    }

    pub async fn status(&self) -> Result<ArchiveStatus> {
        let updates_guard = self.updates_writer.lock().await;
        let ribs_guard = self.ribs_last.lock().await;

        let queued = match &self.replicator {
            Some(rep) => rep.queue().pending_count()?,
            None => 0,
        };

        let failures = match &self.replicator {
            Some(rep) => rep.failures(),
            None => 0,
        };

        Ok(ArchiveStatus {
            enabled: self.cfg.enabled,
            collector_id: self.cfg.collector_id.clone(),
            updates_interval_secs: self.cfg.updates_interval_secs,
            ribs_interval_secs: self.cfg.ribs_interval_secs,
            updates_open_path: updates_guard.as_ref().map(|w| w.path().to_path_buf()),
            updates_record_count: updates_guard
                .as_ref()
                .map(|w| w.record_count())
                .unwrap_or(0),
            ribs_last_path: ribs_guard.as_ref().map(|r| r.final_path.clone()),
            ribs_last_record_count: ribs_guard.as_ref().map(|r| r.record_count).unwrap_or(0),
            queued_replication_jobs: queued,
            replication_failures: failures,
        })
    }

    fn spawn_background_tasks(self: &Arc<Self>) {
        if let Some(replicator) = &self.replicator {
            let rep = Arc::clone(replicator);
            rep.spawn();
        }

        let service = Arc::clone(self);
        tokio::spawn(async move {
            let mut ticker = tokio::time::interval(Duration::from_secs(5));
            loop {
                ticker.tick().await;
                if let Err(err) = service.tick().await {
                    tracing::error!(error=%err, "archive scheduler tick failed");
                }
            }
        });
    }

    async fn tick(&self) -> Result<()> {
        if !self.cfg.enabled {
            return Ok(());
        }

        let now = Utc::now().timestamp();
        self.ensure_updates_writer(now).await?;

        let rib_bucket = aligned_epoch(now, self.cfg.ribs_interval_secs);
        let mut last_rib = self.last_rib_bucket.lock().await;
        if last_rib.map(|v| v != rib_bucket).unwrap_or(true) {
            let snapshot = RibSnapshotInput {
                timestamp: now,
                collector_bgp_id: self.collector_bgp_id,
                view_name: "main".to_string(),
                peers: vec![],
                routes: vec![],
            };
            self.snapshot_now(snapshot).await?;
            *last_rib = Some(rib_bucket);
        }

        Ok(())
    }

    async fn ensure_updates_writer(&self, now_ts: i64) -> Result<()> {
        let update_bucket = aligned_epoch(now_ts, self.cfg.updates_interval_secs);

        let mut writer_guard = self.updates_writer.lock().await;
        let needs_rotate = writer_guard
            .as_ref()
            .map(|w| w.start_ts() != update_bucket)
            .unwrap_or(true);

        if needs_rotate {
            if let Some(old_writer) = writer_guard.take() {
                let finalized = old_writer.finalize(now_ts)?;
                self.emit(Event::ArchiveSegmentFinalized {
                    stream: ArchiveStream::Updates.as_str().to_string(),
                    path: finalized.final_path.display().to_string(),
                    end_ts: finalized.end_ts,
                    records: finalized.record_count,
                });
                if let Some(rep) = &self.replicator {
                    rep.enqueue_segment(&finalized)?;
                }
            }

            let paths = segment_paths(&self.cfg, ArchiveStream::Updates, now_ts)?;
            self.emit(Event::ArchiveSegmentOpened {
                stream: ArchiveStream::Updates.as_str().to_string(),
                path: paths.final_path.display().to_string(),
                start_ts: update_bucket,
            });
            let writer =
                SegmentWriter::new(&self.cfg, ArchiveStream::Updates, update_bucket, paths)?;
            *writer_guard = Some(writer);
        }

        Ok(())
    }

    async fn rotate_updates(&self, now_ts: i64) -> Result<()> {
        {
            let mut writer_guard = self.updates_writer.lock().await;
            if let Some(old_writer) = writer_guard.take() {
                let finalized = old_writer.finalize(now_ts)?;
                self.emit(Event::ArchiveSegmentFinalized {
                    stream: ArchiveStream::Updates.as_str().to_string(),
                    path: finalized.final_path.display().to_string(),
                    end_ts: finalized.end_ts,
                    records: finalized.record_count,
                });
                if let Some(rep) = &self.replicator {
                    rep.enqueue_segment(&finalized)?;
                }
            }
        }

        self.ensure_updates_writer(now_ts).await
    }

    fn emit(&self, event: Event) {
        let _ = self.event_tx.send(EventEnvelope::new(event));
    }
}

fn cleanup_tmp_root(tmp_root: &std::path::Path) -> Result<()> {
    if !tmp_root.exists() {
        return Ok(());
    }

    for entry in std::fs::read_dir(tmp_root)
        .with_context(|| format!("failed reading tmp root {}", tmp_root.display()))?
    {
        let entry = entry?;
        let path = entry.path();
        if path.is_file() {
            std::fs::remove_file(&path)
                .with_context(|| format!("failed removing temp segment {}", path.display()))?;
        }
    }

    Ok(())
}